diff --git a/bddtests/docker-compose-orderer-solo.yml b/bddtests/docker-compose-orderer-solo.yml index f762220b0b0..e5e1cc5efe3 100644 --- a/bddtests/docker-compose-orderer-solo.yml +++ b/bddtests/docker-compose-orderer-solo.yml @@ -1,12 +1,14 @@ orderer0: image: hyperledger/fabric-orderer environment: - - ORDERER_LISTEN_ADDRESS=0.0.0.0 - - ORDERER_LISTEN_PORT=5005 - #- ORDERER_WINDOW_SIZE_MAX=1000 # TODO (implement) - #- ORDERER_BATCH_TIMEOUT=10s # TODO (implement) - #- ORDERER_BATCH_SIZE=10 # TODO (implement) - #- ORDERER_BLOCK_HISTORY_SIZE=100 # TODO (implement) + - ORDERER_GENERAL_ORDERERTYPE=solo + - ORDERER_GENERAL_LEDGERTYPE=ram + - ORDERER_GENERAL_BATCHTIMEOUT=10s + - ORDERER_GENERAL_BATCHSIZE=10 + - ORDERER_GENERAL_MAXWINDOWSIZE=1000 + - ORDERER_GENERAL_LISTENADDRESS=0.0.0.0 + - ORDERER_GENERAL_LISTENPORT=5005 + - ORDERER_RAMLEDGER_HISTORY_SIZE=100 working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer command: orderer diff --git a/orderer/config/config.go b/orderer/config/config.go new file mode 100644 index 00000000000..7daaf61b8da --- /dev/null +++ b/orderer/config/config.go @@ -0,0 +1,167 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/op/go-logging" + "github.com/spf13/viper" +) + +var logger = logging.MustGetLogger("orderer/config") + +func init() { + logging.SetLevel(logging.DEBUG, "") +} + +// Prefix is the default config prefix for the orderer +const Prefix string = "ORDERER" + +// General contains config which should be common among all orderer types +type General struct { + OrdererType string + LedgerType string + BatchTimeout time.Duration + BatchSize uint + QueueSize uint + MaxWindowSize uint + ListenAddress string + ListenPort uint16 +} + +// RAMLedger contains config for the RAM ledger +type RAMLedger struct { + HistorySize uint +} + +// FileLedger contains config for the File ledger +type FileLedger struct { + Location string + Prefix string +} + +// TopLevel directly corresponds to the orderer config yaml +// Note, for non 1-1 mappings, you may append +// something like `mapstructure:"weirdFoRMat"` to +// modify the default mapping, see the "Unmarshal" +// section of https://github.com/spf13/viper for more info +type TopLevel struct { + General General + RAMLedger RAMLedger + FileLedger FileLedger +} + +var defaults = TopLevel{ + General: General{ + OrdererType: "solo", + LedgerType: "ram", + BatchTimeout: 10 * time.Second, + BatchSize: 10, + QueueSize: 1000, + MaxWindowSize: 1000, + ListenAddress: "127.0.0.1", + ListenPort: 5151, + }, + RAMLedger: RAMLedger{ + HistorySize: 10000, + }, + FileLedger: FileLedger{ + Location: "", + Prefix: "hyperledger-fabric-rawledger", + }, +} + +func (c *TopLevel) completeInitialization() { + defer logger.Infof("Validated configuration to: %+v", c) + + for { + switch { + case c.General.OrdererType == "": + logger.Infof("General.OrdererType unset, setting to %s", defaults.General.OrdererType) + c.General.OrdererType = defaults.General.OrdererType + case c.General.LedgerType == "": + logger.Infof("General.LedgerType unset, setting to %s", defaults.General.LedgerType) + c.General.LedgerType = defaults.General.LedgerType + case c.General.BatchTimeout == 0: + logger.Infof("General.BatchTimeout unset, setting to %s", defaults.General.BatchTimeout) + c.General.BatchTimeout = defaults.General.BatchTimeout + case c.General.BatchSize == 0: + logger.Infof("General.BatchSize unset, setting to %s", defaults.General.BatchSize) + c.General.BatchSize = defaults.General.BatchSize + case c.General.QueueSize == 0: + logger.Infof("General.QueueSize unset, setting to %s", defaults.General.QueueSize) + c.General.QueueSize = defaults.General.QueueSize + case c.General.MaxWindowSize == 0: + logger.Infof("General.MaxWindowSize unset, setting to %s", defaults.General.MaxWindowSize) + c.General.MaxWindowSize = defaults.General.MaxWindowSize + case c.General.ListenAddress == "": + logger.Infof("General.ListenAddress unset, setting to %s", defaults.General.ListenAddress) + c.General.ListenAddress = defaults.General.ListenAddress + case c.General.ListenPort == 0: + logger.Infof("General.ListenPort unset, setting to %s", defaults.General.ListenPort) + c.General.ListenPort = defaults.General.ListenPort + case c.FileLedger.Prefix == "": + logger.Infof("FileLedger.Prefix unset, setting to %s", defaults.FileLedger.Prefix) + c.FileLedger.Prefix = defaults.FileLedger.Prefix + default: + return + } + } +} + +// Load parses the orderer.yaml file and environment, producing a struct suitable for config use +func Load() *TopLevel { + config := viper.New() + + // for environment variables + config.SetEnvPrefix(Prefix) + config.AutomaticEnv() + replacer := strings.NewReplacer(".", "_") + config.SetEnvKeyReplacer(replacer) + + config.SetConfigName("orderer") + config.AddConfigPath("./") + config.AddConfigPath("../orderer/") + config.AddConfigPath("../../orderer/") + // Path to look for the config file in based on GOPATH + gopath := os.Getenv("GOPATH") + for _, p := range filepath.SplitList(gopath) { + ordererPath := filepath.Join(p, "src/github.com/hyperledger/fabric/orderer/") + config.AddConfigPath(ordererPath) + } + + err := config.ReadInConfig() + if err != nil { + panic(fmt.Errorf("Error reading %s plugin config: %s", Prefix, err)) + } + + var uconf TopLevel + + err = ExactWithDateUnmarshal(config, &uconf) + if err != nil { + panic(fmt.Errorf("Error unmarshaling into structure: %s", err)) + } + + uconf.completeInitialization() + + return &uconf +} diff --git a/orderer/config/config_test.go b/orderer/config/config_test.go new file mode 100644 index 00000000000..5750b230eaa --- /dev/null +++ b/orderer/config/config_test.go @@ -0,0 +1,71 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "fmt" + "os" + "testing" + + "github.com/spf13/viper" +) + +func TestGoodConfig(t *testing.T) { + config := Load() + if config == nil { + t.Fatalf("Could not load config") + } + t.Logf("%+v", config) +} + +func TestBadConfig(t *testing.T) { + config := viper.New() + config.SetConfigName("orderer") + config.AddConfigPath("../") + + err := config.ReadInConfig() + if err != nil { + t.Fatalf("Error reading %s plugin config: %s", Prefix, err) + } + + var uconf struct{} + + err = ExactWithDateUnmarshal(config, &uconf) + if err == nil { + t.Fatalf("Should have failed to unmarshal") + } +} + +// TestEnvInnerVar verifies that with the Unmarshal function that +// the environmental overrides still work on internal vars. This was +// a bug in the original viper implementation that is worked around in +// the Load codepath for now +func TestEnvInnerVar(t *testing.T) { + envVar := "ORDERER_GENERAL_LISTENPORT" + envVal := uint16(80) + os.Setenv(envVar, fmt.Sprintf("%d", envVal)) + defer os.Unsetenv(envVar) + config := Load() + + if config == nil { + t.Fatalf("Could not load config") + } + + if config.General.ListenPort != envVal { + t.Fatalf("Environmental override of inner config did not work") + } +} diff --git a/orderer/config/config_util.go b/orderer/config/config_util.go new file mode 100644 index 00000000000..94cd3ccd75b --- /dev/null +++ b/orderer/config/config_util.go @@ -0,0 +1,69 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "github.com/mitchellh/mapstructure" + "github.com/spf13/viper" +) + +func getKeysRecursively(base string, v *viper.Viper, nodeKeys map[string]interface{}) map[string]interface{} { + result := make(map[string]interface{}) + for key := range nodeKeys { + fqKey := base + key + val := v.Get(fqKey) + if m, ok := val.(map[interface{}]interface{}); ok { + logger.Debugf("Found map value for %s", fqKey) + tmp := make(map[string]interface{}) + for ik, iv := range m { + cik, ok := ik.(string) + if !ok { + panic("Non string key-entry") + } + tmp[cik] = iv + } + result[key] = getKeysRecursively(fqKey+".", v, tmp) + } else { + logger.Debugf("Found real value for %s setting to %T %v", fqKey, val, val) + result[key] = val + } + } + return result +} + +// ExactWithDateUnmarshal is intended to unmarshal a config file into a structure +// producing error when extraneous variables are introduced and supporting +// the time.Duration type +func ExactWithDateUnmarshal(v *viper.Viper, output interface{}) error { + baseKeys := v.AllSettings() // AllKeys doesn't actually return all keys, it only returns the base ones + leafKeys := getKeysRecursively("", v, baseKeys) + + logger.Infof("%+v", leafKeys) + config := &mapstructure.DecoderConfig{ + ErrorUnused: true, + Metadata: nil, + Result: output, + WeaklyTypedInput: true, + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + } + + decoder, err := mapstructure.NewDecoder(config) + if err != nil { + return err + } + return decoder.Decode(leafKeys) +} diff --git a/orderer/main.go b/orderer/main.go index 44c075c7185..4f376783720 100644 --- a/orderer/main.go +++ b/orderer/main.go @@ -21,57 +21,55 @@ import ( "io/ioutil" "net" "os" - "time" + "github.com/hyperledger/fabric/orderer/config" "github.com/hyperledger/fabric/orderer/rawledger" "github.com/hyperledger/fabric/orderer/rawledger/fileledger" "github.com/hyperledger/fabric/orderer/rawledger/ramledger" "github.com/hyperledger/fabric/orderer/solo" + "github.com/op/go-logging" "google.golang.org/grpc" ) -func main() { +var logger = logging.MustGetLogger("orderer") - address := os.Getenv("ORDERER_LISTEN_ADDRESS") - if address == "" { - address = "127.0.0.1" - } +func init() { + logging.SetLevel(logging.DEBUG, "") +} - port := os.Getenv("ORDERER_LISTEN_PORT") - if port == "" { - port = "5005" - } +func main() { - lis, err := net.Listen("tcp", address+":"+port) + config := config.Load() + grpcServer := grpc.NewServer() + + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort)) if err != nil { fmt.Println("Failed to listen:", err) return } - grpcServer := grpc.NewServer() - // Stand in until real config ledgerType := os.Getenv("ORDERER_LEDGER_TYPE") var rawledger rawledger.ReadWriter switch ledgerType { case "file": - name, err := ioutil.TempDir("", "hyperledger") // TODO, config - if err != nil { - panic(fmt.Errorf("Error creating temp dir: %s", err)) + location := config.FileLedger.Location + if location == "" { + var err error + location, err = ioutil.TempDir("", config.FileLedger.Prefix) + if err != nil { + panic(fmt.Errorf("Error creating temp dir: %s", err)) + } } - rawledger = fileledger.New(name) + rawledger = fileledger.New(location) case "ram": fallthrough default: - historySize := 10 // TODO, config - rawledger = ramledger.New(historySize) + rawledger = ramledger.New(int(config.RAMLedger.HistorySize)) } - queueSize := 100 // TODO configure - batchSize := 10 - batchTimeout := 10 * time.Second - solo.New(queueSize, batchSize, batchTimeout, rawledger, grpcServer) + solo.New(int(config.General.QueueSize), int(config.General.BatchSize), int(config.General.MaxWindowSize), config.General.BatchTimeout, rawledger, grpcServer) grpcServer.Serve(lis) } diff --git a/orderer/orderer.yaml b/orderer/orderer.yaml new file mode 100644 index 00000000000..f6f4657c08f --- /dev/null +++ b/orderer/orderer.yaml @@ -0,0 +1,68 @@ +--- +################################################################################ +# +# Orderer Configuration +# +# - This controls the type and configuration for the orderer which is started +# - This controls the type and configuration for the rawledger if needed +# +################################################################################ +General: + + # Orderer Type: The orderer implementation to start + # Presently only solo is supported + OrdererType: solo + + # Ledger Type: The ledger type to provide to the orderer (if needed) + # Available types are "ram" and "file" + LedgerType: ram + + # Batch Timeout: The amount of time to wait before creating a batch + BatchTimeout: 10s + + # Batch Size: The maximum number of messages to permit in a batch + BatchSize: 10 + + # Queue Size: The maximum number of messages to allow pending from a gRPC client + QueueSize: 10 + + # Max Window Size: The maximum number of messages to for the orderer Deliver + # to allow before acknowledgement must be received from the client + MaxWindowSize: 1000 + + # Listen address: The IP on which to bind to listen + ListenAddress: 127.0.0.1 + + # Listen port: The port on which to bind to listen + ListenPort: 5151 + +################################################################################ +# +# SECTION: RAM Ledger +# +# - This section applies to the configuration of the RAM ledger +# +################################################################################ +RAMLedger: + + # History Size: The number of blocks that the RAM ledger is set to retain + HistorySize: 1000 + + +################################################################################ +# +# SECTION: File Ledger +# +# - This section applies to the configuration of the file ledger +# +################################################################################ +FileLedger: + + # Location : The directory to store the blocks in + # NOTE: if this unset, a temporary location will be chosen using + # the prefix specified by Prefix + Location: + + # The prefix to use when generating a ledger directory in temporary space + # Otherwise, this value is ignored + Prefix: hyperledger-fabric-rawledger diff --git a/orderer/sample_clients/broadcast_timestamp/client.go b/orderer/sample_clients/broadcast_timestamp/client.go index 5e3e217de06..5f7977c169b 100644 --- a/orderer/sample_clients/broadcast_timestamp/client.go +++ b/orderer/sample_clients/broadcast_timestamp/client.go @@ -21,6 +21,7 @@ import ( "time" ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" + "github.com/hyperledger/fabric/orderer/config" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -39,7 +40,8 @@ func (s *broadcastClient) broadcast(transaction []byte) error { } func main() { - serverAddr := "127.0.0.1:5005" + config := config.Load() + serverAddr := fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort) conn, err := grpc.Dial(serverAddr, grpc.WithInsecure()) defer conn.Close() if err != nil { diff --git a/orderer/sample_clients/deliver_stdout/client.go b/orderer/sample_clients/deliver_stdout/client.go index 65a932f6272..1eb0510ad0d 100644 --- a/orderer/sample_clients/deliver_stdout/client.go +++ b/orderer/sample_clients/deliver_stdout/client.go @@ -20,6 +20,7 @@ import ( "fmt" ab "github.com/hyperledger/fabric/orderer/atomicbroadcast" + "github.com/hyperledger/fabric/orderer/config" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -101,7 +102,8 @@ func (r *deliverClient) readUntilClose() { } func main() { - serverAddr := "127.0.0.1:5005" + config := config.Load() + serverAddr := fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort) conn, err := grpc.Dial(serverAddr, grpc.WithInsecure()) if err != nil { fmt.Println("Error connecting:", err) diff --git a/orderer/solo/deliver.go b/orderer/solo/deliver.go index 825131537a9..20d91ec4da8 100644 --- a/orderer/solo/deliver.go +++ b/orderer/solo/deliver.go @@ -172,7 +172,7 @@ func (d *deliverer) processUpdate(update *ab.SeekInfo) bool { } logger.Debugf("Updating properties for client") - if update == nil || update.WindowSize == 0 || update.WindowSize > MagicLargestWindow { + if update == nil || update.WindowSize == 0 || update.WindowSize > uint64(d.ds.maxWindow) { close(d.exitChan) return d.sendErrorReply(ab.Status_BAD_REQUEST) } diff --git a/orderer/solo/deliver_test.go b/orderer/solo/deliver_test.go index 6a2991fbce0..5e41a5af4fb 100644 --- a/orderer/solo/deliver_test.go +++ b/orderer/solo/deliver_test.go @@ -27,6 +27,9 @@ import ( "github.com/hyperledger/fabric/orderer/rawledger/ramledger" ) +// MagicLargestWindow is used as the default max window size for initializing the deliver service +const MagicLargestWindow int = 1000 + type mockD struct { grpc.ServerStream recvChan chan *ab.DeliverUpdate @@ -66,7 +69,7 @@ func TestOldestSeek(t *testing.T) { go ds.handleDeliver(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: MagicLargestWindow, Start: ab.SeekInfo_OLDEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST}}} count := 0 for { @@ -98,7 +101,7 @@ func TestNewestSeek(t *testing.T) { go ds.handleDeliver(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: MagicLargestWindow, Start: ab.SeekInfo_NEWEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST}}} select { case blockReply := <-m.sendChan: @@ -126,7 +129,7 @@ func TestSpecificSeek(t *testing.T) { go ds.handleDeliver(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: MagicLargestWindow, Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} select { case blockReply := <-m.sendChan: @@ -155,7 +158,7 @@ func TestBadSeek(t *testing.T) { go ds.handleDeliver(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: MagicLargestWindow, Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} select { case blockReply := <-m.sendChan: @@ -166,7 +169,7 @@ func TestBadSeek(t *testing.T) { t.Fatalf("Timed out waiting to get all blocks") } - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: MagicLargestWindow, Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(3 * ledgerSize)}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(3 * ledgerSize)}}} select { case blockReply := <-m.sendChan: @@ -188,7 +191,7 @@ func TestBadWindow(t *testing.T) { go ds.handleDeliver(m) - m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: MagicLargestWindow * 2, Start: ab.SeekInfo_OLDEST}}} + m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST}}} select { case blockReply := <-m.sendChan: diff --git a/orderer/solo/solo.go b/orderer/solo/solo.go index 972c9375e9a..62d28882b85 100644 --- a/orderer/solo/solo.go +++ b/orderer/solo/solo.go @@ -26,26 +26,23 @@ import ( "google.golang.org/grpc" ) -var logger = logging.MustGetLogger("solo/server") +var logger = logging.MustGetLogger("orderer/solo") func init() { logging.SetLevel(logging.DEBUG, "") } -// MagicLargestWindow is a temporary constant which limits the maximum window size, TODO this should be config at a later date -const MagicLargestWindow = 1000 - type server struct { bs *broadcastServer ds *deliverServer } // New creates a ab.AtomicBroadcastServer based on the solo orderer implementation -func New(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.ReadWriter, grpcServer *grpc.Server) ab.AtomicBroadcastServer { +func New(queueSize, batchSize, maxWindowSize int, batchTimeout time.Duration, rl rawledger.ReadWriter, grpcServer *grpc.Server) ab.AtomicBroadcastServer { logger.Infof("Starting solo with queueSize=%d, batchSize=%d batchTimeout=%v and ledger=%T", queueSize, batchSize, batchTimeout, rl) s := &server{ bs: newBroadcastServer(queueSize, batchSize, batchTimeout, rl), - ds: newDeliverServer(rl, MagicLargestWindow), + ds: newDeliverServer(rl, maxWindowSize), } ab.RegisterAtomicBroadcastServer(grpcServer, s) return s