diff --git a/bcs-common/common/http/httpclient/client.go b/bcs-common/common/http/httpclient/client.go index fd3fc2987a..522f7d75a9 100644 --- a/bcs-common/common/http/httpclient/client.go +++ b/bcs-common/common/http/httpclient/client.go @@ -97,6 +97,10 @@ func (client *HttpClient) SetTlsVerityConfig(tlsConf *tls.Config) { client.httpCli.Transport = trans } +func (client *HttpClient) SetTransPort(transport http.RoundTripper) { + client.httpCli.Transport = transport +} + func (client *HttpClient) NewTransPort() *http.Transport { return &http.Transport{ TLSHandshakeTimeout: 5 * time.Second, diff --git a/bcs-common/common/websocketDialer/client-dialer.go b/bcs-common/common/websocketDialer/client-dialer.go new file mode 100644 index 0000000000..13298d4b19 --- /dev/null +++ b/bcs-common/common/websocketDialer/client-dialer.go @@ -0,0 +1,71 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "io" + "net" + "sync" + "time" +) + +func clientDial(dialer Dialer, conn *connection, message *message) { + defer conn.Close() + + var ( + netConn net.Conn + err error + ) + + if dialer == nil { + netConn, err = net.DialTimeout(message.proto, message.address, time.Duration(message.deadline)*time.Millisecond) + } else { + netConn, err = dialer(message.proto, message.address) + } + + if err != nil { + conn.tunnelClose(err) + return + } + defer netConn.Close() + + pipe(conn, netConn) +} + +func pipe(client *connection, server net.Conn) { + wg := sync.WaitGroup{} + wg.Add(1) + + close := func(err error) error { + if err == nil { + err = io.EOF + } + client.doTunnelClose(err) + server.Close() + return err + } + + go func() { + defer wg.Done() + _, err := io.Copy(server, client) + close(err) + }() + + _, err := io.Copy(client, server) + err = close(err) + wg.Wait() + + // Write tunnel error after no more I/O is happening, just incase messages get out of order + client.writeErr(err) +} diff --git a/bcs-common/common/websocketDialer/client.go b/bcs-common/common/websocketDialer/client.go new file mode 100644 index 0000000000..8e0ad36abc --- /dev/null +++ b/bcs-common/common/websocketDialer/client.go @@ -0,0 +1,77 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "context" + "crypto/tls" + "io/ioutil" + "net/http" + "time" + + "bk-bcs/bcs-common/common/blog" + "github.com/gorilla/websocket" +) + +type ConnectAuthorizer func(proto, address string) bool + +func ClientConnect(ctx context.Context, wsURL string, headers http.Header, tlsConfig *tls.Config, dialer *websocket.Dialer, auth ConnectAuthorizer) { + if err := connectToProxy(ctx, wsURL, headers, tlsConfig, auth, dialer); err != nil { + time.Sleep(time.Duration(5) * time.Second) + } +} + +func connectToProxy(rootCtx context.Context, proxyURL string, headers http.Header, tlsConfig *tls.Config, auth ConnectAuthorizer, dialer *websocket.Dialer) error { + blog.Infof("connecting to proxy %s", proxyURL) + + if dialer == nil { + dialer = &websocket.Dialer{Proxy: http.ProxyFromEnvironment, HandshakeTimeout: HandshakeTimeOut, TLSClientConfig: tlsConfig} + } + ws, resp, err := dialer.Dial(proxyURL, headers) + if err != nil { + if resp == nil { + blog.Errorf("Failed to connect to proxy, empty dialer response") + } else { + rb, err2 := ioutil.ReadAll(resp.Body) + if err2 != nil { + blog.Errorf("Failed to connect to proxy. Response status: %v - %v. Couldn't read response body (err: %v)", resp.StatusCode, resp.Status, err2) + } else { + blog.Errorf("Failed to connect to proxy. Response status: %v - %v. Response body: %s. Error: %s", resp.StatusCode, resp.Status, rb, err.Error()) + } + } + return err + } + defer ws.Close() + + result := make(chan error, 2) + + ctx, cancel := context.WithCancel(rootCtx) + defer cancel() + + session := NewClientSession(auth, ws) + defer session.Close() + + go func() { + _, err = session.Serve(ctx) + result <- err + }() + + select { + case <-ctx.Done(): + blog.Infof("proxy %s done", proxyURL) + return nil + case err := <-result: + return err + } +} diff --git a/bcs-common/common/websocketDialer/connection.go b/bcs-common/common/websocketDialer/connection.go new file mode 100644 index 0000000000..afb55db281 --- /dev/null +++ b/bcs-common/common/websocketDialer/connection.go @@ -0,0 +1,211 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "context" + "errors" + "io" + "net" + "sync" + "time" + + "bk-bcs/bcs-common/common/websocketDialer/metrics" +) + +type connection struct { + sync.Mutex + + ctx context.Context + cancel func() + err error + writeDeadline time.Time + buf chan []byte + readBuf []byte + addr addr + session *Session + connID int64 +} + +func newConnection(connID int64, session *Session, proto, address string) *connection { + c := &connection{ + addr: addr{ + proto: proto, + address: address, + }, + connID: connID, + session: session, + buf: make(chan []byte, 1024), + } + metrics.IncSMTotalAddConnectionsForWS(session.clientKey, proto, address) + return c +} + +func (c *connection) tunnelClose(err error) { + metrics.IncSMTotalRemoveConnectionsForWS(c.session.clientKey, c.addr.Network(), c.addr.String()) + c.writeErr(err) + c.doTunnelClose(err) +} + +func (c *connection) doTunnelClose(err error) { + c.Lock() + defer c.Unlock() + + if c.err != nil { + return + } + + c.err = err + if c.err == nil { + c.err = io.ErrClosedPipe + } + + close(c.buf) +} + +func (c *connection) tunnelWriter() io.Writer { + return chanWriter{conn: c, C: c.buf} +} + +func (c *connection) Close() error { + c.session.closeConnection(c.connID, io.EOF) + return nil +} + +func (c *connection) copyData(b []byte) int { + n := copy(b, c.readBuf) + c.readBuf = c.readBuf[n:] + return n +} + +func (c *connection) Read(b []byte) (int, error) { + if len(b) == 0 { + return 0, nil + } + + n := c.copyData(b) + if n > 0 { + metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n)) + return n, nil + } + + next, ok := <-c.buf + if !ok { + err := io.EOF + c.Lock() + if c.err != nil { + err = c.err + } + c.Unlock() + return 0, err + } + + c.readBuf = next + n = c.copyData(b) + metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n)) + return n, nil +} + +func (c *connection) Write(b []byte) (int, error) { + c.Lock() + if c.err != nil { + defer c.Unlock() + return 0, c.err + } + c.Unlock() + + deadline := int64(0) + if !c.writeDeadline.IsZero() { + deadline = c.writeDeadline.Sub(time.Now()).Nanoseconds() / 1000000 + } + msg := newMessage(c.connID, deadline, b) + metrics.AddSMTotalTransmitBytesOnWS(c.session.clientKey, float64(len(msg.Bytes()))) + return c.session.writeMessage(msg) +} + +func (c *connection) writeErr(err error) { + if err != nil { + msg := newErrorMessage(c.connID, err) + metrics.AddSMTotalTransmitErrorBytesOnWS(c.session.clientKey, float64(len(msg.Bytes()))) + c.session.writeMessage(msg) + } +} + +func (c *connection) LocalAddr() net.Addr { + return c.addr +} + +func (c *connection) RemoteAddr() net.Addr { + return c.addr +} + +func (c *connection) SetDeadline(t time.Time) error { + if err := c.SetReadDeadline(t); err != nil { + return err + } + return c.SetWriteDeadline(t) +} + +func (c *connection) SetReadDeadline(t time.Time) error { + return nil +} + +func (c *connection) SetWriteDeadline(t time.Time) error { + c.writeDeadline = t + return nil +} + +type addr struct { + proto string + address string +} + +func (a addr) Network() string { + return a.proto +} + +func (a addr) String() string { + return a.address +} + +type chanWriter struct { + conn *connection + C chan []byte +} + +func (c chanWriter) Write(buf []byte) (int, error) { + c.conn.Lock() + defer c.conn.Unlock() + + if c.conn.err != nil { + return 0, c.conn.err + } + + newBuf := make([]byte, len(buf)) + copy(newBuf, buf) + buf = newBuf + + select { + // must copy the buffer + case c.C <- buf: + return len(buf), nil + default: + select { + case c.C <- buf: + return len(buf), nil + case <-time.After(15 * time.Second): + return 0, errors.New("backed up reader") + } + } +} diff --git a/bcs-common/common/websocketDialer/dialer.go b/bcs-common/common/websocketDialer/dialer.go new file mode 100644 index 0000000000..fde59bf3f4 --- /dev/null +++ b/bcs-common/common/websocketDialer/dialer.go @@ -0,0 +1,41 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "net" + "time" +) + +type Dialer func(network, address string) (net.Conn, error) + +func (s *Server) HasSession(clientKey string) bool { + _, err := s.sessions.getDialer(clientKey, 0) + return err == nil +} + +func (s *Server) Dial(clientKey string, deadline time.Duration, proto, address string) (net.Conn, error) { + d, err := s.sessions.getDialer(clientKey, deadline) + if err != nil { + return nil, err + } + + return d(proto, address) +} + +func (s *Server) Dialer(clientKey string, deadline time.Duration) Dialer { + return func(proto, address string) (net.Conn, error) { + return s.Dial(clientKey, deadline, proto, address) + } +} diff --git a/bcs-common/common/websocketDialer/message.go b/bcs-common/common/websocketDialer/message.go new file mode 100644 index 0000000000..678c5794ac --- /dev/null +++ b/bcs-common/common/websocketDialer/message.go @@ -0,0 +1,233 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" + "io/ioutil" + "math/rand" + "strings" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" +) + +const ( + Data messageType = iota + 1 + Connect + Error + AddClient + RemoveClient +) + +var ( + idCounter int64 +) + +func init() { + r := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) + idCounter = r.Int63() +} + +type messageType int64 + +type message struct { + id int64 + err error + connID int64 + deadline int64 + messageType messageType + bytes []byte + body io.Reader + proto string + address string +} + +func nextid() int64 { + return atomic.AddInt64(&idCounter, 1) +} + +func newMessage(connID int64, deadline int64, bytes []byte) *message { + return &message{ + id: nextid(), + connID: connID, + deadline: deadline, + messageType: Data, + bytes: bytes, + } +} + +func newConnect(connID int64, deadline time.Duration, proto, address string) *message { + return &message{ + id: nextid(), + connID: connID, + deadline: deadline.Nanoseconds() / 1000000, + messageType: Connect, + bytes: []byte(fmt.Sprintf("%s/%s", proto, address)), + proto: proto, + address: address, + } +} + +func newErrorMessage(connID int64, err error) *message { + return &message{ + id: nextid(), + err: err, + connID: connID, + messageType: Error, + bytes: []byte(err.Error()), + } +} + +func newAddClient(client string) *message { + return &message{ + id: nextid(), + messageType: AddClient, + address: client, + bytes: []byte(client), + } +} + +func newRemoveClient(client string) *message { + return &message{ + id: nextid(), + messageType: RemoveClient, + address: client, + bytes: []byte(client), + } +} + +func newServerMessage(reader io.Reader) (*message, error) { + buf := bufio.NewReader(reader) + + id, err := binary.ReadVarint(buf) + if err != nil { + return nil, err + } + + connID, err := binary.ReadVarint(buf) + if err != nil { + return nil, err + } + + mType, err := binary.ReadVarint(buf) + if err != nil { + return nil, err + } + + m := &message{ + id: id, + messageType: messageType(mType), + connID: connID, + body: buf, + } + + if m.messageType == Data || m.messageType == Connect { + deadline, err := binary.ReadVarint(buf) + if err != nil { + return nil, err + } + m.deadline = deadline + } + + if m.messageType == Connect { + bytes, err := ioutil.ReadAll(io.LimitReader(buf, 100)) + if err != nil { + return nil, err + } + parts := strings.SplitN(string(bytes), "/", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("failed to parse connect address") + } + m.proto = parts[0] + m.address = parts[1] + m.bytes = bytes + } else if m.messageType == AddClient || m.messageType == RemoveClient { + bytes, err := ioutil.ReadAll(io.LimitReader(buf, 100)) + if err != nil { + return nil, err + } + m.address = string(bytes) + m.bytes = bytes + } + + return m, nil +} + +func (m *message) Err() error { + if m.err != nil { + return m.err + } + bytes, err := ioutil.ReadAll(io.LimitReader(m.body, 100)) + if err != nil { + return err + } + + str := string(bytes) + if str == "EOF" { + m.err = io.EOF + } else { + m.err = errors.New(str) + } + return m.err +} + +func (m *message) Bytes() []byte { + return append(m.header(), m.bytes...) +} + +func (m *message) header() []byte { + buf := make([]byte, 24) + offset := 0 + offset += binary.PutVarint(buf[offset:], m.id) + offset += binary.PutVarint(buf[offset:], m.connID) + offset += binary.PutVarint(buf[offset:], int64(m.messageType)) + if m.messageType == Data || m.messageType == Connect { + offset += binary.PutVarint(buf[offset:], m.deadline) + } + return buf[:offset] +} + +func (m *message) Read(p []byte) (int, error) { + return m.body.Read(p) +} + +func (m *message) WriteTo(wsConn *wsConn) (int, error) { + err := wsConn.WriteMessage(websocket.BinaryMessage, m.Bytes()) + return len(m.bytes), err +} + +func (m *message) String() string { + switch m.messageType { + case Data: + if m.body == nil { + return fmt.Sprintf("%d DATA [%d]: %d bytes: %s", m.id, m.connID, len(m.bytes), string(m.bytes)) + } + return fmt.Sprintf("%d DATA [%d]: buffered", m.id, m.connID) + case Error: + return fmt.Sprintf("%d ERROR [%d]: %s", m.id, m.connID, m.Err()) + case Connect: + return fmt.Sprintf("%d CONNECT [%d]: %s/%s deadline %d", m.id, m.connID, m.proto, m.address, m.deadline) + case AddClient: + return fmt.Sprintf("%d ADDCLIENT [%s]", m.id, m.address) + case RemoveClient: + return fmt.Sprintf("%d REMOVECLIENT [%s]", m.id, m.address) + } + return fmt.Sprintf("%d UNKNOWN[%d]: %d", m.id, m.connID, m.messageType) +} diff --git a/bcs-common/common/websocketDialer/metrics/session_metrics.go b/bcs-common/common/websocketDialer/metrics/session_metrics.go new file mode 100644 index 0000000000..346e965a36 --- /dev/null +++ b/bcs-common/common/websocketDialer/metrics/session_metrics.go @@ -0,0 +1,220 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + TotalAddWS = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_add_websocket_session", + Help: "Total count of added websocket sessions", + }, + []string{"clientkey", "peer"}) + + TotalRemoveWS = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_remove_websocket_session", + Help: "Total count of removed websocket sessions", + }, + []string{"clientkey", "peer"}) + + TotalAddConnectionsForWS = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_add_connections", + Help: "Total count of added connections", + }, + []string{"clientkey", "proto", "addr"}, + ) + + TotalRemoveConnectionsForWS = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_remove_connections", + Help: "Total count of removed connections", + }, + []string{"clientkey", "proto", "addr"}, + ) + + TotalTransmitBytesOnWS = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_transmit_bytes", + Help: "Total bytes transmited", + }, + []string{"clientkey"}, + ) + + TotalTransmitErrorBytesOnWS = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_transmit_error_bytes", + Help: "Total error bytes transmited", + }, + []string{"clientkey"}, + ) + + TotalReceiveBytesOnWS = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_receive_bytes", + Help: "Total bytes recieved", + }, + []string{"clientkey"}, + ) + + TotalAddPeerAttempt = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_peer_ws_attempt", + Help: "Total count of attempts to establish websocket session to other bcs-api", + }, + []string{"peer"}, + ) + TotalPeerConnected = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_peer_ws_connected", + Help: "Total count of connected websocket sessions to other bcs-api", + }, + []string{"peer"}, + ) + TotalPeerDisConnected = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "bcs_api", + Subsystem: "session_server", + Name: "total_peer_ws_disconnected", + Help: "Total count of disconnected websocket sessions from other bcs-api", + }, + []string{"peer"}, + ) +) + +func init() { + // Session metrics + prometheus.MustRegister(TotalAddWS) + prometheus.MustRegister(TotalRemoveWS) + prometheus.MustRegister(TotalAddConnectionsForWS) + prometheus.MustRegister(TotalRemoveConnectionsForWS) + prometheus.MustRegister(TotalTransmitBytesOnWS) + prometheus.MustRegister(TotalTransmitErrorBytesOnWS) + prometheus.MustRegister(TotalReceiveBytesOnWS) + prometheus.MustRegister(TotalAddPeerAttempt) + prometheus.MustRegister(TotalPeerConnected) + prometheus.MustRegister(TotalPeerDisConnected) +} + +func IncSMTotalAddWS(clientKey string, peer bool) { + var peerStr string + if peer { + peerStr = "true" + } else { + peerStr = "false" + } + + TotalAddWS.With( + prometheus.Labels{ + "clientkey": clientKey, + "peer": peerStr, + }).Inc() +} + +func IncSMTotalRemoveWS(clientKey string, peer bool) { + var peerStr string + if peer { + peerStr = "true" + } else { + peerStr = "false" + } + TotalRemoveWS.With( + prometheus.Labels{ + "clientkey": clientKey, + "peer": peerStr, + }).Inc() +} + +func AddSMTotalTransmitErrorBytesOnWS(clientKey string, size float64) { + TotalTransmitErrorBytesOnWS.With( + prometheus.Labels{ + "clientkey": clientKey, + }).Add(size) +} + +func AddSMTotalTransmitBytesOnWS(clientKey string, size float64) { + TotalTransmitBytesOnWS.With( + prometheus.Labels{ + "clientkey": clientKey, + }).Add(size) +} + +func AddSMTotalReceiveBytesOnWS(clientKey string, size float64) { + TotalReceiveBytesOnWS.With( + prometheus.Labels{ + "clientkey": clientKey, + }).Add(size) +} + +func IncSMTotalAddConnectionsForWS(clientKey, proto, addr string) { + TotalAddConnectionsForWS.With( + prometheus.Labels{ + "clientkey": clientKey, + "proto": proto, + "addr": addr, + }).Inc() +} + +func IncSMTotalRemoveConnectionsForWS(clientKey, proto, addr string) { + TotalRemoveConnectionsForWS.With( + prometheus.Labels{ + "clientkey": clientKey, + "proto": proto, + "addr": addr, + }).Inc() +} + +func IncSMTotalAddPeerAttempt(peer string) { + TotalAddPeerAttempt.With( + prometheus.Labels{ + "peer": peer, + }).Inc() +} + +func IncSMTotalPeerConnected(peer string) { + TotalPeerConnected.With( + prometheus.Labels{ + "peer": peer, + }).Inc() +} + +func IncSMTotalPeerDisConnected(peer string) { + TotalPeerDisConnected.With( + prometheus.Labels{ + "peer": peer, + }).Inc() + +} diff --git a/bcs-common/common/websocketDialer/peer.go b/bcs-common/common/websocketDialer/peer.go new file mode 100644 index 0000000000..9c5685565d --- /dev/null +++ b/bcs-common/common/websocketDialer/peer.go @@ -0,0 +1,139 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "strings" + "time" + + "bk-bcs/bcs-common/common/blog" + "bk-bcs/bcs-common/common/websocketDialer/metrics" + "github.com/gorilla/websocket" +) + +var ( + Token = "BCS-API-Tunnel-Token" + ID = "BCS-API-Tunnel-ID" +) + +func (s *Server) AddPeer(url, id, token string) { + if s.PeerID == "" || s.PeerToken == "" { + return + } + + ctx, cancel := context.WithCancel(context.Background()) + peer := peer{ + url: url, + id: id, + token: token, + cancel: cancel, + } + + blog.Infof("Adding peer %s, %s", url, id) + + s.peerLock.Lock() + defer s.peerLock.Unlock() + + if p, ok := s.peers[id]; ok { + if p.equals(peer) { + return + } + p.cancel() + } + + s.peers[id] = peer + go peer.start(ctx, s) +} + +func (s *Server) RemovePeer(id string) { + s.peerLock.Lock() + defer s.peerLock.Unlock() + + if p, ok := s.peers[id]; ok { + blog.Infof("Removing peer %s", id) + p.cancel() + } + delete(s.peers, id) +} + +type peer struct { + url, id, token string + cancel func() +} + +func (p peer) equals(other peer) bool { + return p.url == other.url && + p.id == other.id && + p.token == other.token +} + +func (p *peer) start(ctx context.Context, s *Server) { + headers := http.Header{ + ID: {s.PeerID}, + Token: {s.PeerToken}, + } + + dialer := &websocket.Dialer{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + HandshakeTimeout: HandshakeTimeOut, + } + +outer: + for { + select { + case <-ctx.Done(): + break outer + default: + } + + metrics.IncSMTotalAddPeerAttempt(p.id) + // wait for peer server to be ok + time.Sleep(2 * time.Second) + ws, _, err := dialer.Dial(p.url, headers) + if err != nil { + blog.Errorf("Failed to connect to peer %s [local ID=%s]: %s", p.url, s.PeerID, err.Error()) + time.Sleep(5 * time.Second) + continue + } + metrics.IncSMTotalPeerConnected(p.id) + + session := NewClientSession(func(string, string) bool { return true }, ws) + session.dialer = func(network, address string) (net.Conn, error) { + parts := strings.SplitN(network, "::", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid clientKey/proto: %s", network) + } + return s.Dial(parts[0], 15*time.Second, parts[1], address) + } + + s.sessions.addListener(session) + _, err = session.Serve(context.Background()) + s.sessions.removeListener(session) + session.Close() + + if err != nil { + blog.Errorf("Failed to serve peer connection %s: %s", p.id, err.Error()) + } + + ws.Close() + time.Sleep(5 * time.Second) + } +} diff --git a/bcs-common/common/websocketDialer/server.go b/bcs-common/common/websocketDialer/server.go new file mode 100644 index 0000000000..abd0ee1d16 --- /dev/null +++ b/bcs-common/common/websocketDialer/server.go @@ -0,0 +1,117 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "context" + "net/http" + "sync" + "time" + + "bk-bcs/bcs-common/common/blog" + "github.com/gorilla/websocket" + "github.com/pkg/errors" +) + +var ( + errFailedAuth = errors.New("failed authentication") + errWrongMessageType = errors.New("wrong websocket message type") +) + +type Authorizer func(req *http.Request) (clientKey string, authed bool, err error) +type CleanCredentials func(clientKey string) +type ErrorWriter func(rw http.ResponseWriter, req *http.Request, code int, err error) + +func DefaultErrorWriter(rw http.ResponseWriter, req *http.Request, code int, err error) { + rw.Write([]byte(err.Error())) + rw.WriteHeader(code) +} + +type Server struct { + PeerID string + PeerToken string + authorizer Authorizer + cleanCredentials CleanCredentials + errorWriter ErrorWriter + sessions *sessionManager + peers map[string]peer + peerLock sync.Mutex +} + +func New(auth Authorizer, errorWriter ErrorWriter, clean CleanCredentials) *Server { + return &Server{ + peers: map[string]peer{}, + authorizer: auth, + cleanCredentials: clean, + errorWriter: errorWriter, + sessions: newSessionManager(), + } +} + +func (s *Server) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + clientKey, authed, peer, err := s.auth(req) + if err != nil { + s.errorWriter(rw, req, 400, err) + return + } + if !authed { + s.errorWriter(rw, req, 401, errFailedAuth) + return + } + + blog.Infof("Handling backend connection request [%s]", clientKey) + + upgrader := websocket.Upgrader{ + HandshakeTimeout: 5 * time.Second, + CheckOrigin: func(r *http.Request) bool { return true }, + Error: s.errorWriter, + } + + wsConn, err := upgrader.Upgrade(rw, req, nil) + if err != nil { + s.errorWriter(rw, req, 400, errors.Wrapf(err, "Error during upgrade for host [%v]", clientKey)) + return + } + + session := s.sessions.add(clientKey, wsConn, peer) + defer s.sessions.remove(session) + + // Don't need to associate req.Context() to the Session, it will cancel otherwise + code, err := session.Serve(context.Background()) + if err != nil { + // Hijacked so we can't write to the client + blog.Infof("error in remotedialer server [%d]: %s", code, err.Error()) + // clean credentials from db + s.cleanCredentials(clientKey) + } +} + +// auth authorize a peer client +func (s *Server) auth(req *http.Request) (clientKey string, authed, peer bool, err error) { + id := req.Header.Get(ID) + token := req.Header.Get(Token) + if id != "" && token != "" { + // peer authentication + s.peerLock.Lock() + p, ok := s.peers[id] + s.peerLock.Unlock() + + if ok && p.token == token { + return id, true, true, nil + } + } + + id, authed, err = s.authorizer(req) + return id, authed, false, err +} diff --git a/bcs-common/common/websocketDialer/session.go b/bcs-common/common/websocketDialer/session.go new file mode 100644 index 0000000000..1d1b7d1b0f --- /dev/null +++ b/bcs-common/common/websocketDialer/session.go @@ -0,0 +1,316 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "bk-bcs/bcs-common/common/blog" + "github.com/gorilla/websocket" +) + +type Session struct { + sync.Mutex + + nextConnID int64 + clientKey string + sessionKey int64 + conn *wsConn + conns map[int64]*connection + remoteClientKeys map[string]map[int]bool + auth ConnectAuthorizer + pingCancel context.CancelFunc + pingWait sync.WaitGroup + dialer Dialer + client bool +} + +// PrintTunnelData No tunnel logging by default +var PrintTunnelData bool + +func init() { + if os.Getenv("CATTLE_TUNNEL_DATA_DEBUG") == "true" { + PrintTunnelData = true + } +} + +func NewClientSession(auth ConnectAuthorizer, conn *websocket.Conn) *Session { + return &Session{ + clientKey: "client", + conn: newWSConn(conn), + conns: map[int64]*connection{}, + auth: auth, + client: true, + } +} + +func newSession(sessionKey int64, clientKey string, conn *websocket.Conn) *Session { + return &Session{ + nextConnID: 1, + clientKey: clientKey, + sessionKey: sessionKey, + conn: newWSConn(conn), + conns: map[int64]*connection{}, + remoteClientKeys: map[string]map[int]bool{}, + } +} + +func (s *Session) startPings(rootCtx context.Context) { + ctx, cancel := context.WithCancel(rootCtx) + s.pingCancel = cancel + s.pingWait.Add(1) + + go func() { + defer s.pingWait.Done() + + t := time.NewTicker(PingWriteInterval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + s.conn.Lock() + if err := s.conn.conn.WriteControl(websocket.PingMessage, []byte(""), time.Now().Add(time.Second)); err != nil { + blog.Errorf("Error writing ping", err.Error()) + } + blog.Debug("Wrote ping") + s.conn.Unlock() + } + } + }() +} + +func (s *Session) stopPings() { + if s.pingCancel == nil { + return + } + + s.pingCancel() + s.pingWait.Wait() +} + +func (s *Session) Serve(ctx context.Context) (int, error) { + if s.client { + s.startPings(ctx) + } + + for { + msType, reader, err := s.conn.NextReader() + if err != nil { + return 400, err + } + + if msType != websocket.BinaryMessage { + return 400, errWrongMessageType + } + + if err := s.serveMessage(reader); err != nil { + return 500, err + } + } +} + +func (s *Session) serveMessage(reader io.Reader) error { + message, err := newServerMessage(reader) + if err != nil { + return err + } + + if PrintTunnelData { + blog.Debug("REQUEST ", message) + } + + if message.messageType == Connect { + if s.auth == nil || !s.auth(message.proto, message.address) { + return errors.New("connect not allowed") + } + s.clientConnect(message) + return nil + } + + s.Lock() + if message.messageType == AddClient && s.remoteClientKeys != nil { + err := s.addRemoteClient(message.address) + s.Unlock() + return err + } else if message.messageType == RemoveClient { + err := s.removeRemoteClient(message.address) + s.Unlock() + return err + } + conn := s.conns[message.connID] + s.Unlock() + + if conn == nil { + if message.messageType == Data { + err := fmt.Errorf("connection not found %s/%d/%d", s.clientKey, s.sessionKey, message.connID) + newErrorMessage(message.connID, err).WriteTo(s.conn) + } + return nil + } + + switch message.messageType { + case Data: + if _, err := io.Copy(conn.tunnelWriter(), message); err != nil { + s.closeConnection(message.connID, err) + } + case Error: + s.closeConnection(message.connID, message.Err()) + } + + return nil +} + +func parseAddress(address string) (string, int, error) { + parts := strings.SplitN(address, "/", 2) + if len(parts) != 2 { + return "", 0, errors.New("not / separated") + } + v, err := strconv.Atoi(parts[1]) + return parts[0], v, err +} + +func (s *Session) addRemoteClient(address string) error { + clientKey, sessionKey, err := parseAddress(address) + if err != nil { + return fmt.Errorf("invalid remote Session %s: %v", address, err) + } + + keys := s.remoteClientKeys[clientKey] + if keys == nil { + keys = map[int]bool{} + s.remoteClientKeys[clientKey] = keys + } + keys[int(sessionKey)] = true + + if PrintTunnelData { + blog.Debug("ADD REMOTE CLIENT %s, SESSION %d", address, s.sessionKey) + } + + return nil +} + +func (s *Session) removeRemoteClient(address string) error { + clientKey, sessionKey, err := parseAddress(address) + if err != nil { + return fmt.Errorf("invalid remote Session %s: %v", address, err) + } + + keys := s.remoteClientKeys[clientKey] + delete(keys, int(sessionKey)) + if len(keys) == 0 { + delete(s.remoteClientKeys, clientKey) + } + + if PrintTunnelData { + blog.Debug("REMOVE REMOTE CLIENT %s, SESSION %d", address, s.sessionKey) + } + + return nil +} + +func (s *Session) closeConnection(connID int64, err error) { + s.Lock() + conn := s.conns[connID] + delete(s.conns, connID) + if PrintTunnelData { + blog.Debug("CONNECTIONS %d %d", s.sessionKey, len(s.conns)) + } + s.Unlock() + + if conn != nil { + conn.tunnelClose(err) + } +} + +func (s *Session) clientConnect(message *message) { + conn := newConnection(message.connID, s, message.proto, message.address) + + s.Lock() + s.conns[message.connID] = conn + if PrintTunnelData { + blog.Debug("CONNECTIONS %d %d", s.sessionKey, len(s.conns)) + } + s.Unlock() + + go clientDial(s.dialer, conn, message) +} + +func (s *Session) serverConnect(deadline time.Duration, proto, address string) (net.Conn, error) { + connID := atomic.AddInt64(&s.nextConnID, 1) + conn := newConnection(connID, s, proto, address) + + s.Lock() + s.conns[connID] = conn + if PrintTunnelData { + blog.Debug("CONNECTIONS %d %d", s.sessionKey, len(s.conns)) + } + s.Unlock() + + _, err := s.writeMessage(newConnect(connID, deadline, proto, address)) + if err != nil { + s.closeConnection(connID, err) + return nil, err + } + + return conn, err +} + +func (s *Session) writeMessage(message *message) (int, error) { + if PrintTunnelData { + blog.Debug("WRITE ", message) + } + return message.WriteTo(s.conn) +} + +func (s *Session) Close() { + s.Lock() + defer s.Unlock() + + s.stopPings() + + for _, connection := range s.conns { + connection.tunnelClose(errors.New("tunnel disconnect")) + } + + s.conns = map[int64]*connection{} +} + +func (s *Session) sessionAdded(clientKey string, sessionKey int64) { + client := fmt.Sprintf("%s/%d", clientKey, sessionKey) + _, err := s.writeMessage(newAddClient(client)) + if err != nil { + s.conn.conn.Close() + } +} + +func (s *Session) sessionRemoved(clientKey string, sessionKey int64) { + client := fmt.Sprintf("%s/%d", clientKey, sessionKey) + _, err := s.writeMessage(newRemoveClient(client)) + if err != nil { + s.conn.conn.Close() + } +} diff --git a/bcs-common/common/websocketDialer/session_manager.go b/bcs-common/common/websocketDialer/session_manager.go new file mode 100644 index 0000000000..44e61a7b57 --- /dev/null +++ b/bcs-common/common/websocketDialer/session_manager.go @@ -0,0 +1,159 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "fmt" + "math/rand" + "net" + "sync" + "time" + + "bk-bcs/bcs-common/common/websocketDialer/metrics" + "github.com/gorilla/websocket" +) + +type sessionListener interface { + sessionAdded(clientKey string, sessionKey int64) + sessionRemoved(clientKey string, sessionKey int64) +} + +type sessionManager struct { + sync.Mutex + clients map[string][]*Session + peers map[string][]*Session + listeners map[sessionListener]bool +} + +func newSessionManager() *sessionManager { + return &sessionManager{ + clients: map[string][]*Session{}, + peers: map[string][]*Session{}, + listeners: map[sessionListener]bool{}, + } +} + +func toDialer(s *Session, prefix string, deadline time.Duration) Dialer { + return func(proto, address string) (net.Conn, error) { + if prefix == "" { + return s.serverConnect(deadline, proto, address) + } + return s.serverConnect(deadline, prefix+"::"+proto, address) + } +} + +func (sm *sessionManager) removeListener(listener sessionListener) { + sm.Lock() + defer sm.Unlock() + + delete(sm.listeners, listener) +} + +func (sm *sessionManager) addListener(listener sessionListener) { + sm.Lock() + defer sm.Unlock() + + sm.listeners[listener] = true + + for k, sessions := range sm.clients { + for _, session := range sessions { + listener.sessionAdded(k, session.sessionKey) + } + } + + for k, sessions := range sm.peers { + for _, session := range sessions { + listener.sessionAdded(k, session.sessionKey) + } + } +} + +func (sm *sessionManager) getDialer(clientKey string, deadline time.Duration) (Dialer, error) { + sm.Lock() + defer sm.Unlock() + + sessions := sm.clients[clientKey] + if len(sessions) > 0 { + return toDialer(sessions[0], "", deadline), nil + } + + for _, sessions := range sm.peers { + for _, session := range sessions { + session.Lock() + keys := session.remoteClientKeys[clientKey] + session.Unlock() + if len(keys) > 0 { + return toDialer(session, clientKey, deadline), nil + } + } + } + + return nil, fmt.Errorf("failed to find Session for client %s", clientKey) +} + +func (sm *sessionManager) add(clientKey string, conn *websocket.Conn, peer bool) *Session { + sessionKey := rand.Int63() + session := newSession(sessionKey, clientKey, conn) + + sm.Lock() + defer sm.Unlock() + + if peer { + sm.peers[clientKey] = append(sm.peers[clientKey], session) + } else { + sm.clients[clientKey] = append(sm.clients[clientKey], session) + } + metrics.IncSMTotalAddWS(clientKey, peer) + + for l := range sm.listeners { + l.sessionAdded(clientKey, session.sessionKey) + } + + return session +} + +func (sm *sessionManager) remove(s *Session) { + var isPeer bool + sm.Lock() + defer sm.Unlock() + + for i, store := range []map[string][]*Session{sm.clients, sm.peers} { + var newSessions []*Session + + for _, v := range store[s.clientKey] { + if v.sessionKey == s.sessionKey { + if i == 0 { + isPeer = false + } else { + isPeer = true + } + metrics.IncSMTotalRemoveWS(s.clientKey, isPeer) + continue + } + newSessions = append(newSessions, v) + } + + if len(newSessions) == 0 { + delete(store, s.clientKey) + } else { + store[s.clientKey] = newSessions + } + } + + for l := range sm.listeners { + l.sessionRemoved(s.clientKey, s.sessionKey) + } + + s.Close() +} diff --git a/bcs-common/common/websocketDialer/types.go b/bcs-common/common/websocketDialer/types.go new file mode 100644 index 0000000000..ed2151b31f --- /dev/null +++ b/bcs-common/common/websocketDialer/types.go @@ -0,0 +1,23 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import "time" + +const ( + PingWaitDuration = 60 * time.Second + PingWriteInterval = 5 * time.Second + MaxRead = 8192 + HandshakeTimeOut = 10 * time.Second +) diff --git a/bcs-common/common/websocketDialer/wsconn.go b/bcs-common/common/websocketDialer/wsconn.go new file mode 100644 index 0000000000..7a06582ac1 --- /dev/null +++ b/bcs-common/common/websocketDialer/wsconn.go @@ -0,0 +1,59 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package websocketDialer + +import ( + "io" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type wsConn struct { + sync.Mutex + conn *websocket.Conn +} + +func newWSConn(conn *websocket.Conn) *wsConn { + w := &wsConn{ + conn: conn, + } + w.setupDeadline() + return w +} + +func (w *wsConn) WriteMessage(messageType int, data []byte) error { + w.Lock() + defer w.Unlock() + w.conn.SetWriteDeadline(time.Now().Add(PingWaitDuration)) + return w.conn.WriteMessage(messageType, data) +} + +func (w *wsConn) NextReader() (int, io.Reader, error) { + return w.conn.NextReader() +} + +func (w *wsConn) setupDeadline() { + w.conn.SetReadDeadline(time.Now().Add(PingWaitDuration)) + w.conn.SetPingHandler(func(string) error { + w.Lock() + w.conn.WriteControl(websocket.PongMessage, []byte(""), time.Now().Add(time.Second)) + w.Unlock() + return w.conn.SetReadDeadline(time.Now().Add(PingWaitDuration)) + }) + w.conn.SetPongHandler(func(string) error { + return w.conn.SetReadDeadline(time.Now().Add(PingWaitDuration)) + }) +} diff --git a/bcs-services/bcs-api/config/config.go b/bcs-services/bcs-api/config/config.go index 1fe878e7f1..db89babdec 100644 --- a/bcs-services/bcs-api/config/config.go +++ b/bcs-services/bcs-api/config/config.go @@ -58,6 +58,7 @@ type ApiServConfig struct { TKE options.TKEOptions Edition string MesosWebconsoleProxyPort uint + PeerToken string } var ( diff --git a/bcs-services/bcs-api/main.go b/bcs-services/bcs-api/main.go index ac3593742e..950cacf747 100644 --- a/bcs-services/bcs-api/main.go +++ b/bcs-services/bcs-api/main.go @@ -116,6 +116,7 @@ func parseConfig(op *options.ServerOption) *config.ApiServConfig { config.ClusterCredentialsFixtures = apiServConfig.BKE.ClusterCredentialsFixtures config.MesosWebconsoleProxyPort = apiServConfig.MesosWebconsoleProxyPort config.TkeConf = op.TKE + apiServConfig.PeerToken = op.PeerToken //server cert directory if op.CertConfig.ServerCertFile != "" && op.CertConfig.ServerKeyFile != "" { diff --git a/bcs-services/bcs-api/options/options.go b/bcs-services/bcs-api/options/options.go index ee5a30ea30..4e5d539cc8 100644 --- a/bcs-services/bcs-api/options/options.go +++ b/bcs-services/bcs-api/options/options.go @@ -41,6 +41,8 @@ type ServerOption struct { MesosWebconsoleProxyPort uint `json:"mesos_webconsole_proxy_port" value:"8083" usage:"Port to connect to mesos webconsole proxy"` TKE TKEOptions `json:"tke"` + + PeerToken string `json:"peer_token" value:"" usage:"peer token to auth with each other, only used to websocket peer"` } type BKEOptions struct { diff --git a/bcs-services/bcs-api/pkg/models/cluster.go b/bcs-services/bcs-api/pkg/models/cluster.go index 055fead7af..dd0e051a1b 100644 --- a/bcs-services/bcs-api/pkg/models/cluster.go +++ b/bcs-services/bcs-api/pkg/models/cluster.go @@ -69,3 +69,14 @@ type RegisterToken struct { Token string `json:"token" gorm:"size:256"` CreatedAt time.Time `json:"created_at"` } + +type WsClusterCredentials struct { + ID uint `gorm:"primary_key"` + ServerKey string `gorm:"unique;not null"` + ClientModule string `gorm:"not null"` + ServerAddress string `gorm:"size:2048"` + CaCertData string `gorm:"size:4096"` + UserToken string `gorm:"size:2048"` + CreatedAt time.Time + UpdatedAt time.Time +} diff --git a/bcs-services/bcs-api/pkg/server/proxier/proxier.go b/bcs-services/bcs-api/pkg/server/proxier/proxier.go index a35cf19d66..87dc7db2f4 100644 --- a/bcs-services/bcs-api/pkg/server/proxier/proxier.go +++ b/bcs-services/bcs-api/pkg/server/proxier/proxier.go @@ -145,6 +145,44 @@ func (f *ReverseProxyDispatcher) ServeHTTP(rw http.ResponseWriter, req *http.Req return } + var proxyHandler *ClusterHandlerInstance + // 先从websocket dialer缓存中查找websocket链 + websocketHandler, found, err := lookupWsHandler(clusterId, req) + if err != nil { + blog.Errorf("error when lookup websocket conn: %s", err.Error()) + err := fmt.Errorf("error when lookup websocket conn: %s", err.Error()) + status := utils.NewInternalError(err) + status.ErrStatus.Reason = "CREATE_TUNNEL_ERROR" + utils.WriteKubeAPIError(rw, status) + return + } + if found { + blog.Info("found websocket conn for cluster %s", clusterId) + handlerServer := stripLeaveSlash(f.ExtractPathPrefix(req), websocketHandler) + proxyHandler = &ClusterHandlerInstance{ + Handler: handlerServer, + } + } else { + // Try not to initialize the handler everytime by using a map to store all the initialized handler + // Use RWLock to fix race condition + f.handlerMutateLock.Lock() + if f.handlerStore[clusterId] == nil { + handlerServer, err := f.InitializeHandlerForCluster(clusterId, externalClusterInfo, req) + if err != nil { + err = fmt.Errorf("error when creating proxy channel: %s", err.Error()) + status := utils.NewInternalError(err) + status.ErrStatus.Reason = "CREATE_TUNNEL_ERROR" + utils.WriteKubeAPIError(rw, status) + f.handlerMutateLock.Unlock() + return + } + f.handlerStore[clusterId] = handlerServer + } + proxyHandler = f.handlerStore[clusterId] + f.handlerMutateLock.Unlock() + } + + blog.Info(req.Header.Get("Authorization")) // Delete the original auth header so that the original user token won't be passed to the rev-proxy request and // damage the real cluster authentication process. delete(req.Header, "Authorization") @@ -162,40 +200,6 @@ func (f *ReverseProxyDispatcher) ServeHTTP(rw http.ResponseWriter, req *http.Req // bke-server instance? req.URL.Scheme = "https" - // Try not to initialize the handler everytime by using a map to store all the initialized handler - f.handlerMutateLock.RLock() - existedHander := f.handlerStore[clusterId] - f.handlerMutateLock.RUnlock() - if existedHander != nil { - if websocket.IsWebSocketUpgrade(req) { - metric.RequestCount.WithLabelValues("k8s_native", "websocket").Inc() - metric.RequestLatency.WithLabelValues("k8s_native", "websocket").Observe(time.Since(start).Seconds()) - } - existedHander.Handler.ServeHTTP(rw, req) - if !websocket.IsWebSocketUpgrade(req) { - metric.RequestCount.WithLabelValues("k8s_native", req.Method).Inc() - metric.RequestLatency.WithLabelValues("k8s_native", req.Method).Observe(time.Since(start).Seconds()) - } - return - } - - // Use RWLock to fix race condition - f.handlerMutateLock.Lock() - if f.handlerStore[clusterId] == nil { - handlerServer, err := f.InitializeHandlerForCluster(clusterId, externalClusterInfo, req) - if err != nil { - err = fmt.Errorf("error when creating proxy channel: %s", err.Error()) - status := utils.NewInternalError(err) - status.ErrStatus.Reason = "CREATE_TUNNEL_ERROR" - utils.WriteKubeAPIError(rw, status) - f.handlerMutateLock.Unlock() - return - } - f.handlerStore[clusterId] = handlerServer - } - f.handlerMutateLock.Unlock() - - proxyHandler := f.handlerStore[clusterId] if websocket.IsWebSocketUpgrade(req) { metric.RequestCount.WithLabelValues("k8s_native", "websocket").Inc() metric.RequestLatency.WithLabelValues("k8s_native", "websocket").Observe(time.Since(start).Seconds()) @@ -230,6 +234,7 @@ func (f *ReverseProxyDispatcher) InitializeUpstreamServer(clusterId string, serv // other cases when we may also need to re-establish the apiserver connection. This includes apiserver connection // failure or apiserver addresses's major changes. func (f *ReverseProxyDispatcher) InitializeHandlerForCluster(clusterId string, externalClusterInfo *m.BCSClusterInfo, req *http.Request) (*ClusterHandlerInstance, error) { + // Query for the cluster credentials clusterCredentials := f.GetClusterCredentials(clusterId) if clusterCredentials == nil || clusterCredentials.ServerAddresses == "" { diff --git a/bcs-services/bcs-api/pkg/server/proxier/websocket_dial.go b/bcs-services/bcs-api/pkg/server/proxier/websocket_dial.go new file mode 100644 index 0000000000..05b0893ce1 --- /dev/null +++ b/bcs-services/bcs-api/pkg/server/proxier/websocket_dial.go @@ -0,0 +1,81 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package proxier + +import ( + "crypto/tls" + "crypto/x509" + "net/http" + "net/url" + "strings" + "time" + + m "bk-bcs/bcs-services/bcs-api/pkg/models" + "bk-bcs/bcs-services/bcs-api/pkg/storages/sqlstore" + "bk-bcs/bcs-services/bcs-api/tunnel" + "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/client-go/transport" +) + +// lookupWsHandler will lookup websocket dialer in cache +func lookupWsHandler(clusterId string, req *http.Request) (*proxy.UpgradeAwareHandler, bool, error) { + credentials := sqlstore.GetWsCredentials(clusterId) + if credentials == nil { + return nil, false, nil + } + + serverAddress := credentials.ServerAddress + if !strings.HasSuffix(serverAddress, "/") { + serverAddress = serverAddress + "/" + } + u, err := url.Parse(serverAddress) + if err != nil { + return nil, false, err + } + + transport := getTransport(clusterId, credentials) + if transport == nil { + return nil, false, nil + } + + responder := &responder{} + proxyHandler := proxy.NewUpgradeAwareHandler(u, transport, true, false, responder) + proxyHandler.UseRequestLocation = true + + return proxyHandler, true, nil +} + +func getTransport(clusterId string, credentials *m.WsClusterCredentials) http.RoundTripper { + tp := &http.Transport{} + if credentials.CaCertData != "" { + certs := x509.NewCertPool() + caCrt := []byte(credentials.CaCertData) + certs.AppendCertsFromPEM(caCrt) + tp.TLSClientConfig = &tls.Config{ + RootCAs: certs, + } + } + + tunnelServer := tunnel.DefaultTunnelServer + if tunnelServer.HasSession(clusterId) { + cd := tunnelServer.Dialer(clusterId, 15*time.Second) + tp.Dial = cd + bearerToken := credentials.UserToken + bearerAuthRoundTripper := transport.NewBearerAuthRoundTripper(bearerToken, tp) + + return bearerAuthRoundTripper + } + + return nil +} diff --git a/bcs-services/bcs-api/pkg/server/server.go b/bcs-services/bcs-api/pkg/server/server.go index 72ba377436..36eb92b95a 100644 --- a/bcs-services/bcs-api/pkg/server/server.go +++ b/bcs-services/bcs-api/pkg/server/server.go @@ -41,6 +41,7 @@ func Setup(conf *config.ApiServConfig) { &m.Cluster{}, &m.ClusterCredentials{}, &m.RegisterToken{}, + &m.WsClusterCredentials{}, // BCS &m.BCSClusterInfo{}, diff --git a/bcs-services/bcs-api/pkg/storages/sqlstore/credentials.go b/bcs-services/bcs-api/pkg/storages/sqlstore/credentials.go index d380ce85d1..ab8ba25ea2 100644 --- a/bcs-services/bcs-api/pkg/storages/sqlstore/credentials.go +++ b/bcs-services/bcs-api/pkg/storages/sqlstore/credentials.go @@ -41,3 +41,40 @@ func SaveCredentials(clusterId, serverAddresses, caCertData, userToken, clusterD ).FirstOrCreate(&credentials) return dbScoped.Error } + +// SaveWsCredentials saves the credentials of cluster registered by websocket +func SaveWsCredentials(serverKey, clientModule, serverAddress, caCertData, userToken string) error { + var credentials m.WsClusterCredentials + // Create or update, source: https://github.com/jinzhu/gorm/issues/1307 + dbScoped := GCoreDB.Where(m.WsClusterCredentials{ServerKey: serverKey}).Assign( + m.WsClusterCredentials{ + ClientModule: clientModule, + ServerAddress: serverAddress, + CaCertData: caCertData, + UserToken: userToken, + }, + ).FirstOrCreate(&credentials) + return dbScoped.Error +} + +// GetWsCredentials query for clusterCredentials of cluster registered by websocket +func GetWsCredentials(serverKey string) *m.WsClusterCredentials { + credentials := m.WsClusterCredentials{} + GCoreDB.Where(&m.WsClusterCredentials{ServerKey: serverKey}).First(&credentials) + if credentials.ID != 0 { + return &credentials + } + return nil +} + +func DelWsCredentials(serverKey string) { + credentials := m.WsClusterCredentials{} + GCoreDB.Where(&m.WsClusterCredentials{ServerKey: serverKey}).Delete(&credentials) +} + +func GetWsCredentialsByClusterId(clusterId string) []*m.WsClusterCredentials { + var credentials []*m.WsClusterCredentials + query := clusterId + "-%" + GCoreDB.Where("server_key LIKE ?", query).Find(&credentials) + return credentials +} diff --git a/bcs-services/bcs-api/processor/http/actions/v4http/k8s/k8s-actions.go b/bcs-services/bcs-api/processor/http/actions/v4http/k8s/k8s-actions.go index 1f09f0003a..464ef0c428 100644 --- a/bcs-services/bcs-api/processor/http/actions/v4http/k8s/k8s-actions.go +++ b/bcs-services/bcs-api/processor/http/actions/v4http/k8s/k8s-actions.go @@ -26,6 +26,7 @@ import ( "bk-bcs/bcs-common/common/types" "bk-bcs/bcs-services/bcs-api/metric" "bk-bcs/bcs-services/bcs-api/processor/http/actions" + "bk-bcs/bcs-services/bcs-api/processor/http/actions/v4http/utils" "bk-bcs/bcs-services/bcs-api/regdiscv" restful "github.com/emicklei/go-restful" @@ -65,6 +66,10 @@ func request2k8sapi(req *restful.Request, uri, method string) (string, error) { return err1.Error(), nil } + httpcli := httpclient.NewHttpClient() + httpcli.SetHeader("Content-Type", "application/json") + httpcli.SetHeader("Accept", "application/json") + rd, err := regdiscv.GetRDiscover() if err != nil { metric.RequestErrorCount.WithLabelValues("k8s_driver", method).Inc() @@ -74,45 +79,57 @@ func request2k8sapi(req *restful.Request, uri, method string) (string, error) { return err1.Error(), nil } - serv, err := rd.GetModuleServers(fmt.Sprintf("%s/%s", types.BCS_MODULE_K8SAPISERVER, cluster)) - if err != nil { - metric.RequestErrorCount.WithLabelValues("k8s_driver", method).Inc() - metric.RequestErrorLatency.WithLabelValues("k8s_driver", method).Observe(time.Since(start).Seconds()) - blog.Error("get cluster %s servers %s error %s", cluster, types.BCS_MODULE_K8SAPISERVER, err.Error()) - err1 := bhttp.InternalError(common.BcsErrApiGetK8sApiFail, fmt.Sprintf("k8s cluster %s not found", cluster)) - return err1.Error(), nil - } + var url string + // 先从websocket dialer缓存中查找websocket链 + serverAddr, tp, found := utils.LookupWsHandler(cluster) + if found { + url = fmt.Sprintf("%s/k8sdriver/v4/%s", serverAddr, uri) + if strings.HasPrefix(serverAddr, "https") { + cliTls, err := rd.GetClientTls() + if err != nil { + blog.Errorf("get client tls error %s", err.Error()) + } + tp.TLSClientConfig = cliTls + } + httpcli.SetTransPort(tp) + } else { + serv, err := rd.GetModuleServers(fmt.Sprintf("%s/%s", types.BCS_MODULE_K8SAPISERVER, cluster)) + if err != nil { + metric.RequestErrorCount.WithLabelValues("k8s_driver", method).Inc() + metric.RequestErrorLatency.WithLabelValues("k8s_driver", method).Observe(time.Since(start).Seconds()) + blog.Error("get cluster %s servers %s error %s", cluster, types.BCS_MODULE_K8SAPISERVER, err.Error()) + err1 := bhttp.InternalError(common.BcsErrApiGetK8sApiFail, fmt.Sprintf("k8s cluster %s not found", cluster)) + return err1.Error(), nil + } - ser, ok := serv.(*types.BcsK8sApiserverInfo) - if !ok { - metric.RequestErrorCount.WithLabelValues("k8s_driver", method).Inc() - metric.RequestErrorLatency.WithLabelValues("k8s_driver", method).Observe(time.Since(start).Seconds()) - blog.Errorf("servers convert to BcsK8sApiserverInfo") - err1 := bhttp.InternalError(common.BcsErrApiGetK8sApiFail, common.BcsErrApiGetK8sApiFailStr) - return err1.Error(), nil - } + ser, ok := serv.(*types.BcsK8sApiserverInfo) + if !ok { + metric.RequestErrorCount.WithLabelValues("k8s_driver", method).Inc() + metric.RequestErrorLatency.WithLabelValues("k8s_driver", method).Observe(time.Since(start).Seconds()) + blog.Errorf("servers convert to BcsK8sApiserverInfo") + err1 := bhttp.InternalError(common.BcsErrApiGetK8sApiFail, common.BcsErrApiGetK8sApiFailStr) + return err1.Error(), nil + } - //host := servInfo.Scheme + "://" + servInfo.IP + ":" + strconv.Itoa(int(servInfo.Port)) - var host string - if ser.ExternalIp != "" && ser.ExternalPort != 0 { - host = fmt.Sprintf("%s://%s:%d", ser.Scheme, ser.ExternalIp, ser.ExternalPort) - } else { - host = fmt.Sprintf("%s://%s:%d", ser.Scheme, ser.IP, ser.Port) - } + //host := servInfo.Scheme + "://" + servInfo.IP + ":" + strconv.Itoa(int(servInfo.Port)) + var host string + if ser.ExternalIp != "" && ser.ExternalPort != 0 { + host = fmt.Sprintf("%s://%s:%d", ser.Scheme, ser.ExternalIp, ser.ExternalPort) + } else { + host = fmt.Sprintf("%s://%s:%d", ser.Scheme, ser.IP, ser.Port) + } - //url := routeHost + "/api/v1/" + uri //a.Conf.BcsRoute - url := fmt.Sprintf("%s/k8sdriver/v4/%s", host, uri) - blog.V(3).Infof("do request to url(%s), method(%s)", url, method) + //url := routeHost + "/api/v1/" + uri //a.Conf.BcsRoute + url = fmt.Sprintf("%s/k8sdriver/v4/%s", host, uri) + blog.V(3).Infof("do request to url(%s), method(%s)", url, method) - httpcli := httpclient.NewHttpClient() - httpcli.SetHeader("Content-Type", "application/json") - httpcli.SetHeader("Accept", "application/json") - if strings.ToLower(ser.Scheme) == "https" { - cliTls, err := rd.GetClientTls() - if err != nil { - blog.Errorf("get client tls error %s", err.Error()) + if strings.ToLower(ser.Scheme) == "https" { + cliTls, err := rd.GetClientTls() + if err != nil { + blog.Errorf("get client tls error %s", err.Error()) + } + httpcli.SetTlsVerityConfig(cliTls) } - httpcli.SetTlsVerityConfig(cliTls) } reply, err := httpcli.Request(url, method, req.Request.Header, data) diff --git a/bcs-services/bcs-api/processor/http/actions/v4http/mesos/mesos-actions.go b/bcs-services/bcs-api/processor/http/actions/v4http/mesos/mesos-actions.go index d080fc9d71..ff58601b13 100644 --- a/bcs-services/bcs-api/processor/http/actions/v4http/mesos/mesos-actions.go +++ b/bcs-services/bcs-api/processor/http/actions/v4http/mesos/mesos-actions.go @@ -26,6 +26,7 @@ import ( "bk-bcs/bcs-common/common/types" "bk-bcs/bcs-services/bcs-api/metric" "bk-bcs/bcs-services/bcs-api/processor/http/actions" + "bk-bcs/bcs-services/bcs-api/processor/http/actions/v4http/utils" "bk-bcs/bcs-services/bcs-api/regdiscv" "github.com/emicklei/go-restful" @@ -83,6 +84,10 @@ func request2mesosapi(req *restful.Request, uri, method string) (string, error) return err1.Error(), nil } + httpcli := httpclient.NewHttpClient() + httpcli.SetHeader(medieTypeHeader, "application/json") + httpcli.SetHeader("Accept", "application/json") + rd, err := regdiscv.GetRDiscover() if err != nil { metric.RequestErrorCount.WithLabelValues("mesos", method).Inc() @@ -92,46 +97,59 @@ func request2mesosapi(req *restful.Request, uri, method string) (string, error) return err1.Error(), nil } - serv, err := rd.GetModuleServers(fmt.Sprintf("%s/%s", types.BCS_MODULE_MESOSAPISERVER, cluster)) - if err != nil { - metric.RequestErrorCount.WithLabelValues("mesos", method).Inc() - metric.RequestErrorLatency.WithLabelValues("mesos", method).Observe(time.Since(start).Seconds()) - blog.Error("get cluster %s servers %s error %s", cluster, types.BCS_MODULE_MESOSAPISERVER, err.Error()) - err1 := bhttp.InternalError(common.BcsErrApiGetMesosApiFail, fmt.Sprintf("mesos cluster %s not found", cluster)) - return err1.Error(), nil - } - - ser, ok := serv.(*types.BcsMesosApiserverInfo) - if !ok { - metric.RequestErrorCount.WithLabelValues("mesos", method).Inc() - metric.RequestErrorLatency.WithLabelValues("mesos", method).Observe(time.Since(start).Seconds()) - blog.Errorf("servers convert to BcsMesosApiserverInfo") - err1 := bhttp.InternalError(common.BcsErrApiGetMesosApiFail, common.BcsErrApiGetMesosApiFailStr) - return err1.Error(), nil - } - - //host := servInfo.Scheme + "://" + servInfo.IP + ":" + strconv.Itoa(int(servInfo.Port)) - var host string - if ser.ExternalIp != "" && ser.ExternalPort != 0 { - host = fmt.Sprintf("%s://%s:%d", ser.Scheme, ser.ExternalIp, ser.ExternalPort) + var url string + // 先从websocket dialer缓存中查找websocket链 + serverAddr, tp, found := utils.LookupWsHandler(cluster) + if found { + url = fmt.Sprintf("%s/mesosdriver/v4/%s", serverAddr, uri) + if strings.HasPrefix(serverAddr, "https") { + cliTls, err := rd.GetClientTls() + if err != nil { + blog.Errorf("get client tls error %s", err.Error()) + } + tp.TLSClientConfig = cliTls + } + httpcli.SetTransPort(tp) } else { - host = fmt.Sprintf("%s://%s:%d", ser.Scheme, ser.IP, ser.Port) - } - //url := routeHost + "/api/v1/" + uri //a.Conf.BcsRoute - url := fmt.Sprintf("%s/mesosdriver/v4/%s", host, uri) - blog.V(3).Infof("do request to url(%s), method(%s)", url, method) - - httpcli := httpclient.NewHttpClient() - httpcli.SetHeader(medieTypeHeader, "application/json") - httpcli.SetHeader("Accept", "application/json") - if strings.ToLower(ser.Scheme) == "https" { - cliTls, err := rd.GetClientTls() + serv, err := rd.GetModuleServers(fmt.Sprintf("%s/%s", types.BCS_MODULE_MESOSAPISERVER, cluster)) if err != nil { - blog.Errorf("get client tls error %s", err.Error()) + metric.RequestErrorCount.WithLabelValues("mesos", method).Inc() + metric.RequestErrorLatency.WithLabelValues("mesos", method).Observe(time.Since(start).Seconds()) + blog.Error("get cluster %s servers %s error %s", cluster, types.BCS_MODULE_MESOSAPISERVER, err.Error()) + err1 := bhttp.InternalError(common.BcsErrApiGetMesosApiFail, fmt.Sprintf("mesos cluster %s not found", cluster)) + return err1.Error(), nil + } + + ser, ok := serv.(*types.BcsMesosApiserverInfo) + if !ok { + metric.RequestErrorCount.WithLabelValues("mesos", method).Inc() + metric.RequestErrorLatency.WithLabelValues("mesos", method).Observe(time.Since(start).Seconds()) + blog.Errorf("servers convert to BcsMesosApiserverInfo") + err1 := bhttp.InternalError(common.BcsErrApiGetMesosApiFail, common.BcsErrApiGetMesosApiFailStr) + return err1.Error(), nil + } + + //host := servInfo.Scheme + "://" + servInfo.IP + ":" + strconv.Itoa(int(servInfo.Port)) + var host string + if ser.ExternalIp != "" && ser.ExternalPort != 0 { + host = fmt.Sprintf("%s://%s:%d", ser.Scheme, ser.ExternalIp, ser.ExternalPort) + } else { + host = fmt.Sprintf("%s://%s:%d", ser.Scheme, ser.IP, ser.Port) + } + //url := routeHost + "/api/v1/" + uri //a.Conf.BcsRoute + url = fmt.Sprintf("%s/mesosdriver/v4/%s", host, uri) + blog.V(3).Infof("do request to url(%s), method(%s)", url, method) + + if strings.ToLower(ser.Scheme) == "https" { + cliTls, err := rd.GetClientTls() + if err != nil { + blog.Errorf("get client tls error %s", err.Error()) + } + httpcli.SetTlsVerityConfig(cliTls) } - httpcli.SetTlsVerityConfig(cliTls) } + blog.Info(url) reply, err := httpcli.Request(url, method, req.Request.Header, data) if err != nil { metric.RequestErrorCount.WithLabelValues("mesos", method).Inc() diff --git a/bcs-services/bcs-api/processor/http/actions/v4http/utils/util.go b/bcs-services/bcs-api/processor/http/actions/v4http/utils/util.go new file mode 100644 index 0000000000..b905268f36 --- /dev/null +++ b/bcs-services/bcs-api/processor/http/actions/v4http/utils/util.go @@ -0,0 +1,62 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package utils + +import ( + "math/rand" + "net/http" + "time" + + "bk-bcs/bcs-common/common/blog" + "bk-bcs/bcs-services/bcs-api/pkg/storages/sqlstore" + "bk-bcs/bcs-services/bcs-api/tunnel" +) + +// LookupWsHandler will lookup websocket dialer in cache +func LookupWsHandler(clusterId string) (string, *http.Transport, bool) { + cluster := sqlstore.GetClusterByBCSInfo("", clusterId) + if cluster == nil { + return "", nil, false + } + credentials := sqlstore.GetWsCredentialsByClusterId(cluster.ID) + if len(credentials) == 0 { + return "", nil, false + } + + for _, credential := range credentials { + blog.Info(credential.ServerKey) + } + + rand.Shuffle(len(credentials), func(i, j int) { + credentials[i], credentials[j] = credentials[j], credentials[i] + }) + + for _, credential := range credentials { + blog.Info(credential.ServerKey) + } + + tunnelServer := tunnel.DefaultTunnelServer + for _, credential := range credentials { + clientKey := credential.ServerKey + serverAddress := credential.ServerAddress + if tunnelServer.HasSession(clientKey) { + blog.Infof("found sesseion: %s", clientKey) + tp := &http.Transport{} + cd := tunnelServer.Dialer(clientKey, 15*time.Second) + tp.Dial = cd + return serverAddress, tp, true + } + } + return "", nil, false +} diff --git a/bcs-services/bcs-api/processor/server.go b/bcs-services/bcs-api/processor/server.go index f4a597f030..cfaf7a77a9 100644 --- a/bcs-services/bcs-api/processor/server.go +++ b/bcs-services/bcs-api/processor/server.go @@ -25,6 +25,7 @@ import ( "bk-bcs/bcs-services/bcs-api/pkg/server/resthdrs" "bk-bcs/bcs-services/bcs-api/processor/http/actions" "bk-bcs/bcs-services/bcs-api/processor/http/actions/v4http/mesos/webconsole" + "bk-bcs/bcs-services/bcs-api/tunnel" ) type Processor struct { @@ -57,10 +58,20 @@ func (p *Processor) Start() error { blog.Errorf("new filter failed: %v", err) os.Exit(1) } + proxier.DefaultReverseProxyDispatcher.Initialize() + + tunnelServer := tunnel.NewTunnelServer() + err = tunnel.StartPeerManager(p.config, tunnelServer) + if err != nil { + blog.Errorf("failed to start peermanager: %s", err.Error()) + return err + } + p.httpServ.RegisterWebServer("", generalFilter.Filter, actions.GetApiAction()) router := p.httpServ.GetRouter() webContainer := p.httpServ.GetWebContainer() + router.Handle("/bcsapi/v1/websocket/connect", tunnelServer) router.Handle("/bcsapi/v1/webconsole/{sub_path:.*}", webconsole.NewWebconsoleProxy(p.config.ClientCert)) //mesos clueter api forwarding router.Handle("/bcsapi/{sub_path:.*}", webContainer) diff --git a/bcs-services/bcs-api/tunnel/peermanager.go b/bcs-services/bcs-api/tunnel/peermanager.go new file mode 100644 index 0000000000..6f2d502cfb --- /dev/null +++ b/bcs-services/bcs-api/tunnel/peermanager.go @@ -0,0 +1,166 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tunnel + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "sync" + "time" + + "bk-bcs/bcs-common/common/RegisterDiscover" + "bk-bcs/bcs-common/common/blog" + "bk-bcs/bcs-common/common/types" + "bk-bcs/bcs-common/common/websocketDialer" + "bk-bcs/bcs-services/bcs-api/config" + "golang.org/x/net/context" +) + +const ( + defaultPeerToken = "Mx9vWfTZea4MEzc7SlvB8aFl0NhmYQvZzEomOYypDMKkev34Q9kIyh32RjXXCIcn" +) + +type peerManager struct { + sync.Mutex + ready bool + token string + urlFormat string + server *websocketDialer.Server + peers map[string]bool +} + +type PeerRDiscover struct { + rd *RegisterDiscover.RegDiscover + rootCxt context.Context + cancel context.CancelFunc +} + +func StartPeerManager(conf *config.ApiServConfig, dialerServer *websocketDialer.Server) error { + dialerServer.PeerID = fmt.Sprintf("%s:%d", conf.LocalIp, conf.Port) + dialerServer.PeerToken = conf.PeerToken + if dialerServer.PeerToken == "" { + dialerServer.PeerToken = defaultPeerToken + blog.Info("use default peer token: [%s]", dialerServer.PeerToken) + } + pm := &peerManager{ + token: dialerServer.PeerToken, + urlFormat: "wss://%s/bcsapi/v1/websocket/connect", + server: dialerServer, + peers: map[string]bool{}, + } + + peerRd := &PeerRDiscover{ + rd: RegisterDiscover.NewRegDiscoverEx(conf.RegDiscvSrv, 10*time.Second), + } + peerRd.rootCxt, peerRd.cancel = context.WithCancel(context.Background()) + + if err := peerRd.rd.Start(); err != nil { + blog.Error("fail to start register and discover bcs-api peers. err:%s", err.Error()) + return err + } + + go peerRd.discoveryAndWatchPeer(pm) + + return nil +} + +func (p *PeerRDiscover) discoveryAndWatchPeer(pm *peerManager) { + key := fmt.Sprintf("%s/%s", types.BCS_SERV_BASEPATH, types.BCS_MODULE_APISERVER) + blog.Infof("start discover service key %s", key) + event, err := p.rd.DiscoverService(key) + if err != nil { + blog.Error("fail to register discover for api. err:%s", err.Error()) + p.cancel() + os.Exit(1) + } + + for { + select { + case eve := <-event: + var peerServs []string + for _, serv := range eve.Server { + apiServ := new(types.APIServInfo) + if err := json.Unmarshal([]byte(serv), apiServ); err != nil { + blog.Warn("fail to do json unmarshal(%s), err:%s", serv, err.Error()) + continue + } + peerServ := fmt.Sprintf("%s:%d", apiServ.IP, apiServ.Port) + peerServs = append(peerServs, peerServ) + } + + err := pm.syncPeers(peerServs) + if err != nil { + blog.Errorf("failed to discovery and watch peers: %s", err.Error()) + } + case <-p.rootCxt.Done(): + blog.Warn("zk register path %s and discover done", key) + return + } + } +} + +func (p *peerManager) syncPeers(servs []string) error { + if len(servs) == 0 { + return errors.New("syncPeers even can't discovery self") + } + + p.addRemovePeers(servs) + + return nil +} + +func (p *peerManager) addRemovePeers(servs []string) { + p.Lock() + defer p.Unlock() + + newSet := map[string]bool{} + ready := false + + for _, serv := range servs { + if serv == p.server.PeerID { + ready = true + } else { + newSet[serv] = true + } + } + + toCreate, toDelete, _ := diff(newSet, p.peers) + for _, peerServ := range toCreate { + p.server.AddPeer(fmt.Sprintf(p.urlFormat, peerServ), peerServ, p.token) + } + for _, ip := range toDelete { + p.server.RemovePeer(ip) + } + + p.peers = newSet + p.ready = ready +} + +func diff(desired, actual map[string]bool) (toCreate []string, toDelete []string, same []string) { + for key := range desired { + if actual[key] { + same = append(same, key) + } else { + toCreate = append(toCreate, key) + } + } + for key := range actual { + if !desired[key] { + toDelete = append(toDelete, key) + } + } + return +} diff --git a/bcs-services/bcs-api/tunnel/tunnel-server.go b/bcs-services/bcs-api/tunnel/tunnel-server.go new file mode 100644 index 0000000000..e7c8e8c846 --- /dev/null +++ b/bcs-services/bcs-api/tunnel/tunnel-server.go @@ -0,0 +1,153 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tunnel + +import ( + "encoding/base64" + "encoding/json" + "errors" + "net/http" + "net/url" + + "bk-bcs/bcs-common/common/blog" + "bk-bcs/bcs-common/common/websocketDialer" + "bk-bcs/bcs-services/bcs-api/pkg/storages/sqlstore" +) + +const ( + Module = "BCS-API-Tunnel-Module" + RegisterToken = "BCS-API-Tunnel-Token" + Params = "BCS-API-Tunnel-Params" + Cluster = "BCS-API-Tunnel-ClusterId" + + KubeAgentModule = "kube-agent" + K8sDriverModule = "k8s-driver" + MesosDriverModule = "mesos-driver" +) + +var ( + DefaultTunnelServer *websocketDialer.Server + errFailedAuth = errors.New("failed authentication") +) + +type RegisterCluster struct { + Address string `json:"address"` + UserToken string `json:"userToken"` + CACert string `json:"caCert"` +} + +// authorizeTunnel authorize an client +func authorizeTunnel(req *http.Request) (string, bool, error) { + moduleName := req.Header.Get(Module) + if moduleName == "" { + return "", false, errors.New("module empty") + } + + registerToken := req.Header.Get(RegisterToken) + if registerToken == "" { + return "", false, errors.New("registerToken empty") + } + + clusterId := req.Header.Get(Cluster) + if clusterId == "" { + return "", false, errors.New("clusterId empty") + } + + var registerCluster RegisterCluster + params := req.Header.Get(Params) + bytes, err := base64.StdEncoding.DecodeString(params) + if err != nil { + blog.Errorf("error when decode cluster params registered by websocket: %s", err.Error()) + return "", false, err + } + if err := json.Unmarshal(bytes, ®isterCluster); err != nil { + blog.Errorf("error when unmarshal cluster params registered by websocket: %s", err.Error()) + return "", false, err + } + + if registerCluster.Address == "" { + return "", false, errors.New("client dialer address is empty") + } + + if moduleName == KubeAgentModule { + if registerCluster.CACert == "" || registerCluster.UserToken == "" { + return "", false, errors.New("address or cacert or token empty") + } + } + + var caCert string + if registerCluster.CACert != "" { + certBytes, err := base64.StdEncoding.DecodeString(registerCluster.CACert) + if err != nil { + blog.Errorf("error when decode cluster [%s] cacert registered by websocket: %s", clusterId, err.Error()) + return "", false, err + } + caCert = string(certBytes) + } + + // validate if the registerToken is correct + if moduleName == KubeAgentModule { + token := sqlstore.GetRegisterToken(clusterId) + if token == nil { + return "", false, nil + } + if token.Token != registerToken { + return "", false, nil + } + + err = sqlstore.SaveWsCredentials(clusterId, moduleName, registerCluster.Address, caCert, registerCluster.UserToken) + if err != nil { + blog.Errorf("error when save websocket credentials: %s", err.Error()) + return "", false, err + } + return clusterId, true, nil + } else if moduleName == MesosDriverModule || moduleName == K8sDriverModule { + cluster := sqlstore.GetClusterByBCSInfo("", clusterId) + if cluster == nil { + return "", false, nil + } + token := sqlstore.GetRegisterToken(cluster.ID) + if token == nil { + return "", false, nil + } + if token.Token != registerToken { + return "", false, nil + } + + url, err := url.Parse(registerCluster.Address) + if err != nil { + return "", false, nil + } + serverKey := cluster.ID + "-" + url.Host + err = sqlstore.SaveWsCredentials(serverKey, moduleName, registerCluster.Address, caCert, registerCluster.UserToken) + if err != nil { + blog.Errorf("error when save websocket credentials: %s", err.Error()) + return "", false, err + } + return serverKey, true, nil + } + + return "", false, errors.New("unknown client module") +} + +// NewTunnelServer create websocket tunnel server +func NewTunnelServer() *websocketDialer.Server { + DefaultTunnelServer = websocketDialer.New(authorizeTunnel, websocketDialer.DefaultErrorWriter, cleanCredentials) + return DefaultTunnelServer +} + +// cleanCredentials clean client credentials in db +func cleanCredentials(serverKey string) { + sqlstore.DelWsCredentials(serverKey) +}