From d59af2c0090d80cfafe228de80c20595c8947154 Mon Sep 17 00:00:00 2001 From: Will Lahti Date: Mon, 26 Oct 2020 09:14:06 -0400 Subject: [PATCH] Expose new Admin endpoint on orderer for the channel participation API (#1939) * Split out common server logic for http endpoints Move the common logic for http server/handler and TLS authentication used by the operations.System into a separate fabhttp.Server and refactor the operations.System to embed the server. This is being done to allow reuse of the common code for the orderer Admin endpoint, which will host the Channel Participation API. FAB-18246 Signed-off-by: Will Lahti * Expose new Admin endpoint on orderer Move the channel participation API to use this endpoint. FAB-18246 #done Signed-off-by: Will Lahti --- common/fabhttp/fabhttp_suite_test.go | 74 ++++++ common/fabhttp/fakes/logger.go | 115 +++++++++ common/fabhttp/server.go | 144 +++++++++++ common/fabhttp/server_test.go | 232 ++++++++++++++++++ {core/operations => common/fabhttp}/tls.go | 2 +- .../operations => common/fabhttp}/tls_test.go | 30 +-- core/operations/system.go | 112 ++------- core/operations/system_test.go | 25 +- .../channel_participation.go | 8 +- integration/nwo/network.go | 3 +- integration/nwo/orderer_template.go | 11 + .../raft/channel_participation_test.go | 4 +- internal/peer/node/start.go | 21 +- orderer/common/localconfig/config.go | 10 + orderer/common/server/main.go | 49 +++- sampleconfig/orderer.yaml | 31 ++- 16 files changed, 716 insertions(+), 155 deletions(-) create mode 100644 common/fabhttp/fabhttp_suite_test.go create mode 100644 common/fabhttp/fakes/logger.go create mode 100644 common/fabhttp/server.go create mode 100644 common/fabhttp/server_test.go rename {core/operations => common/fabhttp}/tls.go (98%) rename {core/operations => common/fabhttp}/tls_test.go (84%) diff --git a/common/fabhttp/fabhttp_suite_test.go b/common/fabhttp/fabhttp_suite_test.go new file mode 100644 index 00000000000..c2775e2fafe --- /dev/null +++ b/common/fabhttp/fabhttp_suite_test.go @@ -0,0 +1,74 @@ +/* +Copyright IBM Corp All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package fabhttp_test + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + "net/http" + "path/filepath" + "testing" + + "github.com/hyperledger/fabric/common/crypto/tlsgen" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestFabHTTP(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "FabHTTP Suite") +} + +func generateCertificates(tempDir string) { + serverCA, err := tlsgen.NewCA() + Expect(err).NotTo(HaveOccurred()) + err = ioutil.WriteFile(filepath.Join(tempDir, "server-ca.pem"), serverCA.CertBytes(), 0640) + Expect(err).NotTo(HaveOccurred()) + serverKeyPair, err := serverCA.NewServerCertKeyPair("127.0.0.1") + Expect(err).NotTo(HaveOccurred()) + err = ioutil.WriteFile(filepath.Join(tempDir, "server-cert.pem"), serverKeyPair.Cert, 0640) + Expect(err).NotTo(HaveOccurred()) + err = ioutil.WriteFile(filepath.Join(tempDir, "server-key.pem"), serverKeyPair.Key, 0640) + Expect(err).NotTo(HaveOccurred()) + + clientCA, err := tlsgen.NewCA() + Expect(err).NotTo(HaveOccurred()) + err = ioutil.WriteFile(filepath.Join(tempDir, "client-ca.pem"), clientCA.CertBytes(), 0640) + Expect(err).NotTo(HaveOccurred()) + clientKeyPair, err := clientCA.NewClientCertKeyPair() + Expect(err).NotTo(HaveOccurred()) + err = ioutil.WriteFile(filepath.Join(tempDir, "client-cert.pem"), clientKeyPair.Cert, 0640) + Expect(err).NotTo(HaveOccurred()) + err = ioutil.WriteFile(filepath.Join(tempDir, "client-key.pem"), clientKeyPair.Key, 0640) + Expect(err).NotTo(HaveOccurred()) +} + +func newHTTPClient(tlsDir string, withClientCert bool) *http.Client { + clientCertPool := x509.NewCertPool() + caCert, err := ioutil.ReadFile(filepath.Join(tlsDir, "server-ca.pem")) + Expect(err).NotTo(HaveOccurred()) + clientCertPool.AppendCertsFromPEM(caCert) + + tlsClientConfig := &tls.Config{ + RootCAs: clientCertPool, + } + if withClientCert { + clientCert, err := tls.LoadX509KeyPair( + filepath.Join(tlsDir, "client-cert.pem"), + filepath.Join(tlsDir, "client-key.pem"), + ) + Expect(err).NotTo(HaveOccurred()) + tlsClientConfig.Certificates = []tls.Certificate{clientCert} + } + + return &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsClientConfig, + }, + } +} diff --git a/common/fabhttp/fakes/logger.go b/common/fabhttp/fakes/logger.go new file mode 100644 index 00000000000..5e6e3236bae --- /dev/null +++ b/common/fabhttp/fakes/logger.go @@ -0,0 +1,115 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package fakes + +import ( + "sync" + + "github.com/hyperledger/fabric/common/fabhttp" +) + +type Logger struct { + WarnStub func(...interface{}) + warnMutex sync.RWMutex + warnArgsForCall []struct { + arg1 []interface{} + } + WarnfStub func(string, ...interface{}) + warnfMutex sync.RWMutex + warnfArgsForCall []struct { + arg1 string + arg2 []interface{} + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *Logger) Warn(arg1 ...interface{}) { + fake.warnMutex.Lock() + fake.warnArgsForCall = append(fake.warnArgsForCall, struct { + arg1 []interface{} + }{arg1}) + fake.recordInvocation("Warn", []interface{}{arg1}) + fake.warnMutex.Unlock() + if fake.WarnStub != nil { + fake.WarnStub(arg1...) + } +} + +func (fake *Logger) WarnCallCount() int { + fake.warnMutex.RLock() + defer fake.warnMutex.RUnlock() + return len(fake.warnArgsForCall) +} + +func (fake *Logger) WarnCalls(stub func(...interface{})) { + fake.warnMutex.Lock() + defer fake.warnMutex.Unlock() + fake.WarnStub = stub +} + +func (fake *Logger) WarnArgsForCall(i int) []interface{} { + fake.warnMutex.RLock() + defer fake.warnMutex.RUnlock() + argsForCall := fake.warnArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *Logger) Warnf(arg1 string, arg2 ...interface{}) { + fake.warnfMutex.Lock() + fake.warnfArgsForCall = append(fake.warnfArgsForCall, struct { + arg1 string + arg2 []interface{} + }{arg1, arg2}) + fake.recordInvocation("Warnf", []interface{}{arg1, arg2}) + fake.warnfMutex.Unlock() + if fake.WarnfStub != nil { + fake.WarnfStub(arg1, arg2...) + } +} + +func (fake *Logger) WarnfCallCount() int { + fake.warnfMutex.RLock() + defer fake.warnfMutex.RUnlock() + return len(fake.warnfArgsForCall) +} + +func (fake *Logger) WarnfCalls(stub func(string, ...interface{})) { + fake.warnfMutex.Lock() + defer fake.warnfMutex.Unlock() + fake.WarnfStub = stub +} + +func (fake *Logger) WarnfArgsForCall(i int) (string, []interface{}) { + fake.warnfMutex.RLock() + defer fake.warnfMutex.RUnlock() + argsForCall := fake.warnfArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *Logger) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.warnMutex.RLock() + defer fake.warnMutex.RUnlock() + fake.warnfMutex.RLock() + defer fake.warnfMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *Logger) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ fabhttp.Logger = new(Logger) diff --git a/common/fabhttp/server.go b/common/fabhttp/server.go new file mode 100644 index 00000000000..e6d507eb393 --- /dev/null +++ b/common/fabhttp/server.go @@ -0,0 +1,144 @@ +/* +Copyright IBM Corp All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package fabhttp + +import ( + "context" + "crypto/tls" + "net" + "net/http" + "os" + "time" + + "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/util" + "github.com/hyperledger/fabric/core/middleware" +) + +//go:generate counterfeiter -o fakes/logger.go -fake-name Logger . Logger + +type Logger interface { + Warn(args ...interface{}) + Warnf(template string, args ...interface{}) +} + +type Options struct { + Logger Logger + ListenAddress string + TLS TLS +} + +type Server struct { + logger Logger + options Options + httpServer *http.Server + mux *http.ServeMux + addr string +} + +func NewServer(o Options) *Server { + logger := o.Logger + if logger == nil { + logger = flogging.MustGetLogger("fabhttp") + } + + server := &Server{ + logger: logger, + options: o, + } + + server.initializeServer() + + return server +} + +func (s *Server) Run(signals <-chan os.Signal, ready chan<- struct{}) error { + err := s.Start() + if err != nil { + return err + } + + close(ready) + + <-signals + return s.Stop() +} + +func (s *Server) Start() error { + listener, err := s.Listen() + if err != nil { + return err + } + s.addr = listener.Addr().String() + + go s.httpServer.Serve(listener) + + return nil +} + +func (s *Server) Stop() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return s.httpServer.Shutdown(ctx) +} + +func (s *Server) initializeServer() { + s.mux = http.NewServeMux() + s.httpServer = &http.Server{ + Addr: s.options.ListenAddress, + Handler: s.mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 2 * time.Minute, + } +} + +func (s *Server) HandlerChain(h http.Handler, secure bool) http.Handler { + if secure { + return middleware.NewChain(middleware.RequireCert(), middleware.WithRequestID(util.GenerateUUID)).Handler(h) + } + return middleware.NewChain(middleware.WithRequestID(util.GenerateUUID)).Handler(h) +} + +// RegisterHandler registers into the ServeMux a handler chain that borrows +// its security properties from the fabhttp.Server. This method is thread +// safe because ServeMux.Handle() is thread safe, and options are immutable. +// This method can be called either before or after Server.Start(). If the +// pattern exists the method panics. +func (s *Server) RegisterHandler(pattern string, handler http.Handler, secure bool) { + s.mux.Handle( + pattern, + s.HandlerChain( + handler, + secure, + ), + ) +} + +func (s *Server) Listen() (net.Listener, error) { + listener, err := net.Listen("tcp", s.options.ListenAddress) + if err != nil { + return nil, err + } + tlsConfig, err := s.options.TLS.Config() + if err != nil { + return nil, err + } + if tlsConfig != nil { + listener = tls.NewListener(listener, tlsConfig) + } + return listener, nil +} + +func (s *Server) Addr() string { + return s.addr +} + +func (s *Server) Log(keyvals ...interface{}) error { + s.logger.Warn(keyvals...) + return nil +} diff --git a/common/fabhttp/server_test.go b/common/fabhttp/server_test.go new file mode 100644 index 00000000000..5d2248e578d --- /dev/null +++ b/common/fabhttp/server_test.go @@ -0,0 +1,232 @@ +/* +Copyright IBM Corp All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package fabhttp_test + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "path/filepath" + "syscall" + + "github.com/hyperledger/fabric/common/fabhttp" + "github.com/hyperledger/fabric/core/operations/fakes" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/tedsuo/ifrit" +) + +var _ = Describe("Server", func() { + const AdditionalTestApiPath = "/some-additional-test-api" + + var ( + fakeLogger *fakes.Logger + tempDir string + + client *http.Client + unauthClient *http.Client + options fabhttp.Options + server *fabhttp.Server + ) + + BeforeEach(func() { + var err error + tempDir, err = ioutil.TempDir("", "fabhttp-test") + Expect(err).NotTo(HaveOccurred()) + + generateCertificates(tempDir) + client = newHTTPClient(tempDir, true) + unauthClient = newHTTPClient(tempDir, false) + + fakeLogger = &fakes.Logger{} + options = fabhttp.Options{ + Logger: fakeLogger, + ListenAddress: "127.0.0.1:0", + TLS: fabhttp.TLS{ + Enabled: true, + CertFile: filepath.Join(tempDir, "server-cert.pem"), + KeyFile: filepath.Join(tempDir, "server-key.pem"), + ClientCertRequired: false, + ClientCACertFiles: []string{filepath.Join(tempDir, "client-ca.pem")}, + }, + } + + server = fabhttp.NewServer(options) + }) + + AfterEach(func() { + os.RemoveAll(tempDir) + if server != nil { + server.Stop() + } + }) + + It("does not host a secure endpoint for additional APIs by default", func() { + err := server.Start() + Expect(err).NotTo(HaveOccurred()) + + addApiURL := fmt.Sprintf("https://%s%s", server.Addr(), AdditionalTestApiPath) + resp, err := client.Get(addApiURL) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) // service is not handled by default, i.e. in peer + resp.Body.Close() + + resp, err = unauthClient.Get(addApiURL) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) + }) + + It("hosts a secure endpoint for additional APIs when added", func() { + server.RegisterHandler(AdditionalTestApiPath, &fakes.Handler{Code: http.StatusOK, Text: "secure"}, options.TLS.Enabled) + err := server.Start() + Expect(err).NotTo(HaveOccurred()) + + addApiURL := fmt.Sprintf("https://%s%s", server.Addr(), AdditionalTestApiPath) + resp, err := client.Get(addApiURL) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(resp.Header.Get("Content-Type")).To(Equal("text/plain; charset=utf-8")) + buff, err := ioutil.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + Expect(string(buff)).To(Equal("secure")) + resp.Body.Close() + + resp, err = unauthClient.Get(addApiURL) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusUnauthorized)) + }) + + Context("when TLS is disabled", func() { + BeforeEach(func() { + options.TLS.Enabled = false + server = fabhttp.NewServer(options) + }) + + It("does not host an insecure endpoint for additional APIs by default", func() { + err := server.Start() + Expect(err).NotTo(HaveOccurred()) + + addApiURL := fmt.Sprintf("http://%s%s", server.Addr(), AdditionalTestApiPath) + resp, err := client.Get(addApiURL) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) // service is not handled by default, i.e. in peer + resp.Body.Close() + }) + + It("hosts an insecure endpoint for additional APIs when added", func() { + server.RegisterHandler(AdditionalTestApiPath, &fakes.Handler{Code: http.StatusOK, Text: "insecure"}, options.TLS.Enabled) + err := server.Start() + Expect(err).NotTo(HaveOccurred()) + + addApiURL := fmt.Sprintf("http://%s%s", server.Addr(), AdditionalTestApiPath) + resp, err := client.Get(addApiURL) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(resp.Header.Get("Content-Type")).To(Equal("text/plain; charset=utf-8")) + buff, err := ioutil.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + Expect(string(buff)).To(Equal("insecure")) + resp.Body.Close() + }) + }) + + Context("when ClientCertRequired is true", func() { + BeforeEach(func() { + options.TLS.ClientCertRequired = true + server = fabhttp.NewServer(options) + }) + + It("requires a client cert to connect", func() { + err := server.Start() + Expect(err).NotTo(HaveOccurred()) + + _, err = unauthClient.Get(fmt.Sprintf("https://%s/healthz", server.Addr())) + Expect(err).To(MatchError(ContainSubstring("remote error: tls: bad certificate"))) + }) + }) + + Context("when listen fails", func() { + var listener net.Listener + + BeforeEach(func() { + var err error + listener, err = net.Listen("tcp", "127.0.0.1:0") + Expect(err).NotTo(HaveOccurred()) + + options.ListenAddress = listener.Addr().String() + server = fabhttp.NewServer(options) + }) + + AfterEach(func() { + listener.Close() + }) + + It("returns an error", func() { + err := server.Start() + Expect(err).To(MatchError(ContainSubstring("bind: address already in use"))) + }) + }) + + Context("when a bad TLS configuration is provided", func() { + BeforeEach(func() { + options.TLS.CertFile = "cert-file-does-not-exist" + server = fabhttp.NewServer(options) + }) + + It("returns an error", func() { + err := server.Start() + Expect(err).To(MatchError("open cert-file-does-not-exist: no such file or directory")) + }) + }) + + It("proxies Log to the provided logger", func() { + err := server.Log("key", "value") + Expect(err).NotTo(HaveOccurred()) + + Expect(fakeLogger.WarnCallCount()).To(Equal(1)) + Expect(fakeLogger.WarnArgsForCall(0)).To(Equal([]interface{}{"key", "value"})) + }) + + Context("when a logger is not provided", func() { + BeforeEach(func() { + options.Logger = nil + server = fabhttp.NewServer(options) + }) + + It("does not panic when logging", func() { + Expect(func() { server.Log("key", "value") }).NotTo(Panic()) + }) + + It("returns nil from Log", func() { + err := server.Log("key", "value") + Expect(err).NotTo(HaveOccurred()) + }) + }) + + It("supports ifrit", func() { + process := ifrit.Invoke(server) + Eventually(process.Ready()).Should(BeClosed()) + + process.Signal(syscall.SIGTERM) + Eventually(process.Wait()).Should(Receive(BeNil())) + }) + + Context("when start fails and ifrit is used", func() { + BeforeEach(func() { + options.TLS.CertFile = "non-existent-file" + server = fabhttp.NewServer(options) + }) + + It("does not close the ready chan", func() { + process := ifrit.Invoke(server) + Consistently(process.Ready()).ShouldNot(BeClosed()) + Eventually(process.Wait()).Should(Receive(MatchError("open non-existent-file: no such file or directory"))) + }) + }) +}) diff --git a/core/operations/tls.go b/common/fabhttp/tls.go similarity index 98% rename from core/operations/tls.go rename to common/fabhttp/tls.go index af243690daf..dd426bcdcd7 100644 --- a/core/operations/tls.go +++ b/common/fabhttp/tls.go @@ -4,7 +4,7 @@ Copyright IBM Corp All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ -package operations +package fabhttp import ( "crypto/tls" diff --git a/core/operations/tls_test.go b/common/fabhttp/tls_test.go similarity index 84% rename from core/operations/tls_test.go rename to common/fabhttp/tls_test.go index 018bd581ea4..f5103339e77 100644 --- a/core/operations/tls_test.go +++ b/common/fabhttp/tls_test.go @@ -4,7 +4,7 @@ Copyright IBM Corp All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ -package operations_test +package fabhttp_test import ( "crypto/tls" @@ -13,13 +13,13 @@ import ( "os" "path/filepath" - "github.com/hyperledger/fabric/core/operations" + "github.com/hyperledger/fabric/common/fabhttp" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) var _ = Describe("TLS", func() { - var opsTLS operations.TLS + var httpTLS fabhttp.TLS var tempDir string BeforeEach(func() { @@ -29,7 +29,7 @@ var _ = Describe("TLS", func() { generateCertificates(tempDir) - opsTLS = operations.TLS{ + httpTLS = fabhttp.TLS{ Enabled: true, CertFile: filepath.Join(tempDir, "server-cert.pem"), KeyFile: filepath.Join(tempDir, "server-key.pem"), @@ -57,7 +57,7 @@ var _ = Describe("TLS", func() { clientCAPool := x509.NewCertPool() clientCAPool.AppendCertsFromPEM(pemBytes) - tlsConfig, err := opsTLS.Config() + tlsConfig, err := httpTLS.Config() Expect(err).NotTo(HaveOccurred()) Expect(tlsConfig).To(Equal(&tls.Config{ Certificates: []tls.Certificate{cert}, @@ -76,11 +76,11 @@ var _ = Describe("TLS", func() { Context("when TLS is not enabled", func() { BeforeEach(func() { - opsTLS.Enabled = false + httpTLS.Enabled = false }) It("returns a nil config", func() { - tlsConfig, err := opsTLS.Config() + tlsConfig, err := httpTLS.Config() Expect(err).NotTo(HaveOccurred()) Expect(tlsConfig).To(BeNil()) }) @@ -88,11 +88,11 @@ var _ = Describe("TLS", func() { Context("when a client certificate is not required", func() { BeforeEach(func() { - opsTLS.ClientCertRequired = false + httpTLS.ClientCertRequired = false }) It("requests a client cert with verification", func() { - tlsConfig, err := opsTLS.Config() + tlsConfig, err := httpTLS.Config() Expect(err).NotTo(HaveOccurred()) Expect(tlsConfig.ClientAuth).To(Equal(tls.VerifyClientCertIfGiven)) }) @@ -100,22 +100,22 @@ var _ = Describe("TLS", func() { Context("when the server certificate cannot be constructed", func() { BeforeEach(func() { - opsTLS.CertFile = "non-existent-file" + httpTLS.CertFile = "non-existent-file" }) It("returns an error", func() { - _, err := opsTLS.Config() + _, err := httpTLS.Config() Expect(err).To(MatchError("open non-existent-file: no such file or directory")) }) }) Context("the client CA slice is empty", func() { BeforeEach(func() { - opsTLS.ClientCACertFiles = nil + httpTLS.ClientCACertFiles = nil }) It("builds a TLS configuration without an empty CA pool", func() { - tlsConfig, err := opsTLS.Config() + tlsConfig, err := httpTLS.Config() Expect(err).NotTo(HaveOccurred()) Expect(tlsConfig.ClientCAs.Subjects()).To(BeEmpty()) }) @@ -123,13 +123,13 @@ var _ = Describe("TLS", func() { Context("when a client CA cert cannot be read", func() { BeforeEach(func() { - opsTLS.ClientCACertFiles = []string{ + httpTLS.ClientCACertFiles = []string{ "non-existent-file", } }) It("returns an error", func() { - _, err := opsTLS.Config() + _, err := httpTLS.Config() Expect(err).To(MatchError("open non-existent-file: no such file or directory")) }) }) diff --git a/core/operations/system.go b/core/operations/system.go index 3c6520c810b..ea2e429f2a9 100644 --- a/core/operations/system.go +++ b/core/operations/system.go @@ -7,16 +7,13 @@ SPDX-License-Identifier: Apache-2.0 package operations import ( - "context" - "crypto/tls" "net" - "net/http" - "os" "strings" "time" kitstatsd "github.com/go-kit/kit/metrics/statsd" "github.com/hyperledger/fabric-lib-go/healthz" + "github.com/hyperledger/fabric/common/fabhttp" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/flogging/httpadmin" "github.com/hyperledger/fabric/common/metadata" @@ -25,8 +22,6 @@ import ( "github.com/hyperledger/fabric/common/metrics/prometheus" "github.com/hyperledger/fabric/common/metrics/statsd" "github.com/hyperledger/fabric/common/metrics/statsd/goruntime" - "github.com/hyperledger/fabric/common/util" - "github.com/hyperledger/fabric/core/middleware" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -50,14 +45,13 @@ type MetricsOptions struct { } type Options struct { - Logger Logger - ListenAddress string - Metrics MetricsOptions - TLS TLS - Version string + fabhttp.Options + Metrics MetricsOptions + Version string } type System struct { + *fabhttp.Server metrics.Provider logger Logger @@ -66,9 +60,6 @@ type System struct { statsd *kitstatsd.Statsd collectorTicker *time.Ticker sendTicker *time.Ticker - httpServer *http.Server - mux *http.ServeMux - addr string versionGauge metrics.Gauge } @@ -78,12 +69,14 @@ func NewSystem(o Options) *System { logger = flogging.MustGetLogger("operations.runner") } + s := fabhttp.NewServer(o.Options) + system := &System{ + Server: s, logger: logger, options: o, } - system.initializeServer() system.initializeHealthCheckHandler() system.initializeLoggingHandler() system.initializeMetricsProvider() @@ -92,18 +85,6 @@ func NewSystem(o Options) *System { return system } -func (s *System) Run(signals <-chan os.Signal, ready chan<- struct{}) error { - err := s.Start() - if err != nil { - return err - } - - close(ready) - - <-signals - return s.Stop() -} - func (s *System) Start() error { err := s.startMetricsTickers() if err != nil { @@ -112,15 +93,7 @@ func (s *System) Start() error { s.versionGauge.With("version", s.options.Version).Set(1) - listener, err := s.listen() - if err != nil { - return err - } - s.addr = listener.Addr().String() - - go s.httpServer.Serve(listener) - - return nil + return s.Server.Start() } func (s *System) Stop() error { @@ -132,33 +105,13 @@ func (s *System) Stop() error { s.sendTicker.Stop() s.sendTicker = nil } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - return s.httpServer.Shutdown(ctx) + return s.Server.Stop() } func (s *System) RegisterChecker(component string, checker healthz.HealthChecker) error { return s.healthHandler.RegisterChecker(component, checker) } -func (s *System) initializeServer() { - s.mux = http.NewServeMux() - s.httpServer = &http.Server{ - Addr: s.options.ListenAddress, - Handler: s.mux, - ReadTimeout: 10 * time.Second, - WriteTimeout: 2 * time.Minute, - } -} - -func (s *System) handlerChain(h http.Handler, secure bool) http.Handler { - if secure { - return middleware.NewChain(middleware.RequireCert(), middleware.WithRequestID(util.GenerateUUID)).Handler(h) - } - return middleware.NewChain(middleware.WithRequestID(util.GenerateUUID)).Handler(h) -} - func (s *System) initializeMetricsProvider() error { m := s.options.Metrics providerType := m.Provider @@ -178,7 +131,7 @@ func (s *System) initializeMetricsProvider() error { case "prometheus": s.Provider = &prometheus.Provider{} s.versionGauge = versionGauge(s.Provider) - s.mux.Handle("/metrics", s.handlerChain(promhttp.Handler(), s.options.TLS.Enabled)) + s.RegisterHandler("/metrics", promhttp.Handler(), s.options.TLS.Enabled) return nil default: @@ -193,12 +146,12 @@ func (s *System) initializeMetricsProvider() error { } func (s *System) initializeLoggingHandler() { - s.mux.Handle("/logspec", s.handlerChain(httpadmin.NewSpecHandler(), s.options.TLS.Enabled)) + s.RegisterHandler("/logspec", httpadmin.NewSpecHandler(), s.options.TLS.Enabled) } func (s *System) initializeHealthCheckHandler() { s.healthHandler = healthz.NewHealthHandler() - s.mux.Handle("/healthz", s.handlerChain(s.healthHandler, false)) + s.RegisterHandler("/healthz", s.healthHandler, false) } func (s *System) initializeVersionInfoHandler() { @@ -206,20 +159,7 @@ func (s *System) initializeVersionInfoHandler() { CommitSHA: metadata.CommitSHA, Version: metadata.Version, } - s.mux.Handle("/version", s.handlerChain(versionInfo, false)) -} - -// RegisterHandler registers into the ServeMux a handler chain that borrows its security properties from the -// operations.System. This method is thread safe because ServeMux.Handle() is thread safe, and options are immutable. -// This method can be called either before or after System.Start(). If the pattern exists the method panics. -func (s *System) RegisterHandler(pattern string, handler http.Handler) { - s.mux.Handle( - pattern, - s.handlerChain( - handler, - s.options.TLS.Enabled, - ), - ) + s.RegisterHandler("/version", versionInfo, false) } func (s *System) startMetricsTickers() error { @@ -246,27 +186,3 @@ func (s *System) startMetricsTickers() error { return nil } - -func (s *System) listen() (net.Listener, error) { - listener, err := net.Listen("tcp", s.options.ListenAddress) - if err != nil { - return nil, err - } - tlsConfig, err := s.options.TLS.Config() - if err != nil { - return nil, err - } - if tlsConfig != nil { - listener = tls.NewListener(listener, tlsConfig) - } - return listener, nil -} - -func (s *System) Addr() string { - return s.addr -} - -func (s *System) Log(keyvals ...interface{}) error { - s.logger.Warn(keyvals...) - return nil -} diff --git a/core/operations/system_test.go b/core/operations/system_test.go index ca5677b9700..18581c2504a 100644 --- a/core/operations/system_test.go +++ b/core/operations/system_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/hyperledger/fabric-lib-go/healthz" + "github.com/hyperledger/fabric/common/fabhttp" "github.com/hyperledger/fabric/common/metrics/disabled" "github.com/hyperledger/fabric/common/metrics/prometheus" "github.com/hyperledger/fabric/common/metrics/statsd" @@ -55,18 +56,20 @@ var _ = Describe("System", func() { fakeLogger = &fakes.Logger{} options = operations.Options{ - Logger: fakeLogger, - ListenAddress: "127.0.0.1:0", + Options: fabhttp.Options{ + Logger: fakeLogger, + ListenAddress: "127.0.0.1:0", + TLS: fabhttp.TLS{ + Enabled: true, + CertFile: filepath.Join(tempDir, "server-cert.pem"), + KeyFile: filepath.Join(tempDir, "server-key.pem"), + ClientCertRequired: false, + ClientCACertFiles: []string{filepath.Join(tempDir, "client-ca.pem")}, + }, + }, Metrics: operations.MetricsOptions{ Provider: "disabled", }, - TLS: operations.TLS{ - Enabled: true, - CertFile: filepath.Join(tempDir, "server-cert.pem"), - KeyFile: filepath.Join(tempDir, "server-key.pem"), - ClientCertRequired: false, - ClientCACertFiles: []string{filepath.Join(tempDir, "client-ca.pem")}, - }, Version: "test-version", } @@ -122,7 +125,7 @@ var _ = Describe("System", func() { }) It("hosts a secure endpoint for additional APIs when added", func() { - system.RegisterHandler(AdditionalTestApiPath, &fakes.Handler{Code: http.StatusOK, Text: "secure"}) + system.RegisterHandler(AdditionalTestApiPath, &fakes.Handler{Code: http.StatusOK, Text: "secure"}, options.TLS.Enabled) err := system.Start() Expect(err).NotTo(HaveOccurred()) @@ -169,7 +172,7 @@ var _ = Describe("System", func() { }) It("hosts an insecure endpoint for additional APIs when added", func() { - system.RegisterHandler(AdditionalTestApiPath, &fakes.Handler{Code: http.StatusOK, Text: "insecure"}) + system.RegisterHandler(AdditionalTestApiPath, &fakes.Handler{Code: http.StatusOK, Text: "insecure"}, options.TLS.Enabled) err := system.Start() Expect(err).NotTo(HaveOccurred()) diff --git a/integration/channelparticipation/channel_participation.go b/integration/channelparticipation/channel_participation.go index 00f8538d9c1..73041a03445 100644 --- a/integration/channelparticipation/channel_participation.go +++ b/integration/channelparticipation/channel_participation.go @@ -25,7 +25,7 @@ import ( func Join(n *nwo.Network, o *nwo.Orderer, channel string, block *common.Block, expectedChannelInfo ChannelInfo) { blockBytes, err := proto.Marshal(block) Expect(err).NotTo(HaveOccurred()) - url := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels", n.OrdererPort(o, nwo.OperationsPort)) + url := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels", n.OrdererPort(o, nwo.AdminPort)) req := GenerateJoinRequest(url, channel, blockBytes) authClient, _ := nwo.OrdererOperationalClients(n, o) @@ -75,7 +75,7 @@ type channelInfoShort struct { func List(n *nwo.Network, o *nwo.Orderer, expectedChannels []string, systemChannel ...string) { authClient, unauthClient := nwo.OrdererOperationalClients(n, o) - listChannelsURL := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels", n.OrdererPort(o, nwo.OperationsPort)) + listChannelsURL := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels", n.OrdererPort(o, nwo.AdminPort)) body := getBody(authClient, listChannelsURL)() list := &channelList{} @@ -139,7 +139,7 @@ type ChannelInfo struct { func ListOne(n *nwo.Network, o *nwo.Orderer, channel string) ChannelInfo { authClient, _ := nwo.OrdererOperationalClients(n, o) - listChannelURL := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels/%s", n.OrdererPort(o, nwo.OperationsPort), channel) + listChannelURL := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels/%s", n.OrdererPort(o, nwo.AdminPort), channel) body := getBody(authClient, listChannelURL)() c := &ChannelInfo{} @@ -150,7 +150,7 @@ func ListOne(n *nwo.Network, o *nwo.Orderer, channel string) ChannelInfo { func Remove(n *nwo.Network, o *nwo.Orderer, channel string) { authClient, _ := nwo.OrdererOperationalClients(n, o) - url := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels/%s", n.OrdererPort(o, nwo.OperationsPort), channel) + url := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels/%s", n.OrdererPort(o, nwo.AdminPort), channel) req, err := http.NewRequest(http.MethodDelete, url, nil) Expect(err).NotTo(HaveOccurred()) diff --git a/integration/nwo/network.go b/integration/nwo/network.go index fe5c71bad0c..39bf615b5a2 100644 --- a/integration/nwo/network.go +++ b/integration/nwo/network.go @@ -1669,6 +1669,7 @@ const ( ProfilePort PortName = "Profile" OperationsPort PortName = "Operations" ClusterPort PortName = "Cluster" + AdminPort PortName = "Admin" ) // PeerPortNames returns the list of ports that need to be reserved for a Peer. @@ -1679,7 +1680,7 @@ func PeerPortNames() []PortName { // OrdererPortNames returns the list of ports that need to be reserved for an // Orderer. func OrdererPortNames() []PortName { - return []PortName{ListenPort, ProfilePort, OperationsPort, ClusterPort} + return []PortName{ListenPort, ProfilePort, OperationsPort, ClusterPort, AdminPort} } // BrokerPortNames returns the list of ports that need to be reserved for a diff --git a/integration/nwo/orderer_template.go b/integration/nwo/orderer_template.go index 94ae56e6d92..6f84ad6a2fc 100644 --- a/integration/nwo/orderer_template.go +++ b/integration/nwo/orderer_template.go @@ -121,6 +121,17 @@ Metrics: {{- end }} WriteInterval: 5s Prefix: {{ ReplaceAll (ToLower Orderer.ID) "." "_" }} +Admin: + ListenAddress: 127.0.0.1:{{ .OrdererPort Orderer "Admin" }} + TLS: + Enabled: {{ .TLSEnabled }} + PrivateKey: {{ $w.OrdererLocalTLSDir Orderer }}/server.key + Certificate: {{ $w.OrdererLocalTLSDir Orderer }}/server.crt + RootCAs: + - {{ $w.OrdererLocalTLSDir Orderer }}/ca.crt + ClientAuthRequired: {{ $w.ClientAuthRequired }} + ClientRootCAs: + - {{ $w.OrdererLocalTLSDir Orderer }}/ca.crt {{- end }} ChannelParticipation: Enabled: {{ .Consensus.ChannelParticipationEnabled }} diff --git a/integration/raft/channel_participation_test.go b/integration/raft/channel_participation_test.go index 7cf6d7b027d..47bde11d76f 100644 --- a/integration/raft/channel_participation_test.go +++ b/integration/raft/channel_participation_test.go @@ -912,7 +912,7 @@ type errorResponse struct { func channelparticipationJoinFailure(n *nwo.Network, o *nwo.Orderer, channel string, block *common.Block, expectedStatus int, expectedError string) { blockBytes, err := proto.Marshal(block) Expect(err).NotTo(HaveOccurred()) - url := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels", n.OrdererPort(o, nwo.OperationsPort)) + url := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels", n.OrdererPort(o, nwo.AdminPort)) req := channelparticipation.GenerateJoinRequest(url, channel, blockBytes) authClient, _ := nwo.OrdererOperationalClients(n, o) @@ -935,7 +935,7 @@ func doBodyFailure(client *http.Client, req *http.Request, expectedStatus int, e func channelparticipationRemoveFailure(n *nwo.Network, o *nwo.Orderer, channel string, expectedStatus int, expectedError string) { authClient, _ := nwo.OrdererOperationalClients(n, o) - url := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels/%s", n.OrdererPort(o, nwo.OperationsPort), channel) + url := fmt.Sprintf("https://127.0.0.1:%d/participation/v1/channels/%s", n.OrdererPort(o, nwo.AdminPort), channel) req, err := http.NewRequest(http.MethodDelete, url, nil) Expect(err).NotTo(HaveOccurred()) diff --git a/internal/peer/node/start.go b/internal/peer/node/start.go index 2b46d1871fb..ea1dfa76f78 100644 --- a/internal/peer/node/start.go +++ b/internal/peer/node/start.go @@ -32,6 +32,7 @@ import ( "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/crypto/tlsgen" "github.com/hyperledger/fabric/common/deliver" + "github.com/hyperledger/fabric/common/fabhttp" "github.com/hyperledger/fabric/common/flogging" floggingmetrics "github.com/hyperledger/fabric/common/flogging/metrics" "github.com/hyperledger/fabric/common/grpclogging" @@ -1221,8 +1222,17 @@ func initGossipService( func newOperationsSystem(coreConfig *peer.Config) *operations.System { return operations.NewSystem(operations.Options{ - Logger: flogging.MustGetLogger("peer.operations"), - ListenAddress: coreConfig.OperationsListenAddress, + Options: fabhttp.Options{ + Logger: flogging.MustGetLogger("peer.operations"), + ListenAddress: coreConfig.OperationsListenAddress, + TLS: fabhttp.TLS{ + Enabled: coreConfig.OperationsTLSEnabled, + CertFile: coreConfig.OperationsTLSCertFile, + KeyFile: coreConfig.OperationsTLSKeyFile, + ClientCertRequired: coreConfig.OperationsTLSClientAuthRequired, + ClientCACertFiles: coreConfig.OperationsTLSClientRootCAs, + }, + }, Metrics: operations.MetricsOptions{ Provider: coreConfig.MetricsProvider, Statsd: &operations.Statsd{ @@ -1232,13 +1242,6 @@ func newOperationsSystem(coreConfig *peer.Config) *operations.System { Prefix: coreConfig.StatsdPrefix, }, }, - TLS: operations.TLS{ - Enabled: coreConfig.OperationsTLSEnabled, - CertFile: coreConfig.OperationsTLSCertFile, - KeyFile: coreConfig.OperationsTLSKeyFile, - ClientCertRequired: coreConfig.OperationsTLSClientAuthRequired, - ClientCACertFiles: coreConfig.OperationsTLSClientRootCAs, - }, Version: metadata.Version, }) } diff --git a/orderer/common/localconfig/config.go b/orderer/common/localconfig/config.go index 7c904c92077..a0448707d5e 100644 --- a/orderer/common/localconfig/config.go +++ b/orderer/common/localconfig/config.go @@ -29,6 +29,7 @@ type TopLevel struct { Operations Operations Metrics Metrics ChannelParticipation ChannelParticipation + Admin Admin } // General contains config which should be common among all orderer types. @@ -198,6 +199,12 @@ type Statsd struct { Prefix string } +// Admin configures the admin endpoint for the orderer. +type Admin struct { + ListenAddress string + TLS TLS +} + // ChannelParticipation provides the channel participation API configuration for the orderer. // Channel participation uses the same ListenAddress and TLS settings of the Operations service. type ChannelParticipation struct { @@ -283,6 +290,9 @@ var Defaults = TopLevel{ Enabled: false, MaxRequestBodySize: 1024 * 1024, }, + Admin: Admin{ + ListenAddress: "127.0.0.1:0", + }, } // Load parses the orderer YAML file and environment, producing diff --git a/orderer/common/server/main.go b/orderer/common/server/main.go index 4d091c874f9..3441dece5dc 100644 --- a/orderer/common/server/main.go +++ b/orderer/common/server/main.go @@ -28,6 +28,7 @@ import ( "github.com/hyperledger/fabric/bccsp/factory" "github.com/hyperledger/fabric/common/channelconfig" "github.com/hyperledger/fabric/common/crypto" + "github.com/hyperledger/fabric/common/fabhttp" "github.com/hyperledger/fabric/common/flogging" floggingmetrics "github.com/hyperledger/fabric/common/flogging/metrics" "github.com/hyperledger/fabric/common/grpclogging" @@ -95,6 +96,10 @@ func Main() { } opsSystem := newOperationsSystem(conf.Operations, conf.Metrics) + if err = opsSystem.Start(); err != nil { + logger.Panicf("failed to start operations subsystem: %s", err) + } + defer opsSystem.Stop() metricsProvider := opsSystem.Provider logObserver := floggingmetrics.NewObserver(metricsProvider) flogging.SetObserver(logObserver) @@ -243,14 +248,16 @@ func Main() { tlsCallback, ) - opsSystem.RegisterHandler( + adminServer := newAdminServer(conf.Admin) + adminServer.RegisterHandler( channelparticipation.URLBaseV1, channelparticipation.NewHTTPHandler(conf.ChannelParticipation, manager), + conf.Admin.TLS.Enabled, ) - if err = opsSystem.Start(); err != nil { - logger.Panicf("failed to start operations subsystem: %s", err) + if err = adminServer.Start(); err != nil { + logger.Panicf("failed to start admin server: %s", err) } - defer opsSystem.Stop() + defer adminServer.Stop() mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert server := NewServer( @@ -848,8 +855,17 @@ func initializeEtcdraftConsenter( func newOperationsSystem(ops localconfig.Operations, metrics localconfig.Metrics) *operations.System { return operations.NewSystem(operations.Options{ - Logger: flogging.MustGetLogger("orderer.operations"), - ListenAddress: ops.ListenAddress, + Options: fabhttp.Options{ + Logger: flogging.MustGetLogger("orderer.operations"), + ListenAddress: ops.ListenAddress, + TLS: fabhttp.TLS{ + Enabled: ops.TLS.Enabled, + CertFile: ops.TLS.Certificate, + KeyFile: ops.TLS.PrivateKey, + ClientCertRequired: ops.TLS.ClientAuthRequired, + ClientCACertFiles: ops.TLS.ClientRootCAs, + }, + }, Metrics: operations.MetricsOptions{ Provider: metrics.Provider, Statsd: &operations.Statsd{ @@ -859,17 +875,24 @@ func newOperationsSystem(ops localconfig.Operations, metrics localconfig.Metrics Prefix: metrics.Statsd.Prefix, }, }, - TLS: operations.TLS{ - Enabled: ops.TLS.Enabled, - CertFile: ops.TLS.Certificate, - KeyFile: ops.TLS.PrivateKey, - ClientCertRequired: ops.TLS.ClientAuthRequired, - ClientCACertFiles: ops.TLS.ClientRootCAs, - }, Version: metadata.Version, }) } +func newAdminServer(admin localconfig.Admin) *fabhttp.Server { + return fabhttp.NewServer(fabhttp.Options{ + Logger: flogging.MustGetLogger("orderer.admin"), + ListenAddress: admin.ListenAddress, + TLS: fabhttp.TLS{ + Enabled: admin.TLS.Enabled, + CertFile: admin.TLS.Certificate, + KeyFile: admin.TLS.PrivateKey, + ClientCertRequired: admin.TLS.ClientAuthRequired, + ClientCACertFiles: admin.TLS.ClientRootCAs, + }, + }) +} + // caMgr manages certificate authorities scoped by channel type caManager struct { sync.Mutex diff --git a/sampleconfig/orderer.yaml b/sampleconfig/orderer.yaml index 72299f72b08..d05c8f78da2 100644 --- a/sampleconfig/orderer.yaml +++ b/sampleconfig/orderer.yaml @@ -310,7 +310,7 @@ Operations: ################################################################################ # -# Metrics Configuration +# Metrics Configuration # # - This configures metrics collection for the orderer # @@ -334,6 +334,35 @@ Metrics: # The prefix is prepended to all emitted statsd metrics Prefix: +################################################################################ +# +# Admin Configuration +# +# - This configures the admin server endpoint for the orderer +# +################################################################################ +Admin: + # host and port for the admin server + ListenAddress: 127.0.0.1:9443 + + # TLS configuration for the admin endpoint + TLS: + # TLS enabled + Enabled: false + + # Certificate is the location of the PEM encoded TLS certificate + Certificate: + + # PrivateKey points to the location of the PEM-encoded key + PrivateKey: + + # Most admin service endpoints require client authentication when TLS + # is enabled. ClientAuthRequired requires client certificate authentication + # at the TLS layer to access all resources. + ClientAuthRequired: false + + # Paths to PEM encoded ca certificates to trust for client authentication + ClientRootCAs: [] ################################################################################ #