Skip to content

Commit

Permalink
feature: bcs-api uses websocket to implement service register and dis…
Browse files Browse the repository at this point in the history
…covery. issue TencentBlueKing#412
  • Loading branch information
bryanhe-bupt committed Mar 17, 2020
1 parent dfcd451 commit cd6020a
Show file tree
Hide file tree
Showing 27 changed files with 2,339 additions and 103 deletions.
4 changes: 4 additions & 0 deletions bcs-common/common/http/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (client *HttpClient) SetTlsVerityConfig(tlsConf *tls.Config) {
client.httpCli.Transport = trans
}

func (client *HttpClient) SetTransPort(transport http.RoundTripper) {
client.httpCli.Transport = transport
}

func (client *HttpClient) NewTransPort() *http.Transport {
return &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
Expand Down
71 changes: 71 additions & 0 deletions bcs-common/common/websocketDialer/client-dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package websocketDialer

import (
"io"
"net"
"sync"
"time"
)

func clientDial(dialer Dialer, conn *connection, message *message) {
defer conn.Close()

var (
netConn net.Conn
err error
)

if dialer == nil {
netConn, err = net.DialTimeout(message.proto, message.address, time.Duration(message.deadline)*time.Millisecond)
} else {
netConn, err = dialer(message.proto, message.address)
}

if err != nil {
conn.tunnelClose(err)
return
}
defer netConn.Close()

pipe(conn, netConn)
}

func pipe(client *connection, server net.Conn) {
wg := sync.WaitGroup{}
wg.Add(1)

close := func(err error) error {
if err == nil {
err = io.EOF
}
client.doTunnelClose(err)
server.Close()
return err
}

go func() {
defer wg.Done()
_, err := io.Copy(server, client)
close(err)
}()

_, err := io.Copy(client, server)
err = close(err)
wg.Wait()

// Write tunnel error after no more I/O is happening, just incase messages get out of order
client.writeErr(err)
}
77 changes: 77 additions & 0 deletions bcs-common/common/websocketDialer/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package websocketDialer

import (
"context"
"crypto/tls"
"io/ioutil"
"net/http"
"time"

"bk-bcs/bcs-common/common/blog"
"github.com/gorilla/websocket"
)

type ConnectAuthorizer func(proto, address string) bool

func ClientConnect(ctx context.Context, wsURL string, headers http.Header, tlsConfig *tls.Config, dialer *websocket.Dialer, auth ConnectAuthorizer) {
if err := connectToProxy(ctx, wsURL, headers, tlsConfig, auth, dialer); err != nil {
time.Sleep(time.Duration(5) * time.Second)
}
}

func connectToProxy(rootCtx context.Context, proxyURL string, headers http.Header, tlsConfig *tls.Config, auth ConnectAuthorizer, dialer *websocket.Dialer) error {
blog.Infof("connecting to proxy %s", proxyURL)

if dialer == nil {
dialer = &websocket.Dialer{Proxy: http.ProxyFromEnvironment, HandshakeTimeout: HandshakeTimeOut, TLSClientConfig: tlsConfig}
}
ws, resp, err := dialer.Dial(proxyURL, headers)
if err != nil {
if resp == nil {
blog.Errorf("Failed to connect to proxy, empty dialer response")
} else {
rb, err2 := ioutil.ReadAll(resp.Body)
if err2 != nil {
blog.Errorf("Failed to connect to proxy. Response status: %v - %v. Couldn't read response body (err: %v)", resp.StatusCode, resp.Status, err2)
} else {
blog.Errorf("Failed to connect to proxy. Response status: %v - %v. Response body: %s. Error: %s", resp.StatusCode, resp.Status, rb, err.Error())
}
}
return err
}
defer ws.Close()

result := make(chan error, 2)

ctx, cancel := context.WithCancel(rootCtx)
defer cancel()

session := NewClientSession(auth, ws)
defer session.Close()

go func() {
_, err = session.Serve(ctx)
result <- err
}()

select {
case <-ctx.Done():
blog.Infof("proxy %s done", proxyURL)
return nil
case err := <-result:
return err
}
}
211 changes: 211 additions & 0 deletions bcs-common/common/websocketDialer/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package websocketDialer

import (
"context"
"errors"
"io"
"net"
"sync"
"time"

"bk-bcs/bcs-common/common/websocketDialer/metrics"
)

type connection struct {
sync.Mutex

ctx context.Context
cancel func()
err error
writeDeadline time.Time
buf chan []byte
readBuf []byte
addr addr
session *Session
connID int64
}

func newConnection(connID int64, session *Session, proto, address string) *connection {
c := &connection{
addr: addr{
proto: proto,
address: address,
},
connID: connID,
session: session,
buf: make(chan []byte, 1024),
}
metrics.IncSMTotalAddConnectionsForWS(session.clientKey, proto, address)
return c
}

func (c *connection) tunnelClose(err error) {
metrics.IncSMTotalRemoveConnectionsForWS(c.session.clientKey, c.addr.Network(), c.addr.String())
c.writeErr(err)
c.doTunnelClose(err)
}

func (c *connection) doTunnelClose(err error) {
c.Lock()
defer c.Unlock()

if c.err != nil {
return
}

c.err = err
if c.err == nil {
c.err = io.ErrClosedPipe
}

close(c.buf)
}

func (c *connection) tunnelWriter() io.Writer {
return chanWriter{conn: c, C: c.buf}
}

func (c *connection) Close() error {
c.session.closeConnection(c.connID, io.EOF)
return nil
}

func (c *connection) copyData(b []byte) int {
n := copy(b, c.readBuf)
c.readBuf = c.readBuf[n:]
return n
}

func (c *connection) Read(b []byte) (int, error) {
if len(b) == 0 {
return 0, nil
}

n := c.copyData(b)
if n > 0 {
metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n))
return n, nil
}

next, ok := <-c.buf
if !ok {
err := io.EOF
c.Lock()
if c.err != nil {
err = c.err
}
c.Unlock()
return 0, err
}

c.readBuf = next
n = c.copyData(b)
metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n))
return n, nil
}

func (c *connection) Write(b []byte) (int, error) {
c.Lock()
if c.err != nil {
defer c.Unlock()
return 0, c.err
}
c.Unlock()

deadline := int64(0)
if !c.writeDeadline.IsZero() {
deadline = c.writeDeadline.Sub(time.Now()).Nanoseconds() / 1000000
}
msg := newMessage(c.connID, deadline, b)
metrics.AddSMTotalTransmitBytesOnWS(c.session.clientKey, float64(len(msg.Bytes())))
return c.session.writeMessage(msg)
}

func (c *connection) writeErr(err error) {
if err != nil {
msg := newErrorMessage(c.connID, err)
metrics.AddSMTotalTransmitErrorBytesOnWS(c.session.clientKey, float64(len(msg.Bytes())))
c.session.writeMessage(msg)
}
}

func (c *connection) LocalAddr() net.Addr {
return c.addr
}

func (c *connection) RemoteAddr() net.Addr {
return c.addr
}

func (c *connection) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
return c.SetWriteDeadline(t)
}

func (c *connection) SetReadDeadline(t time.Time) error {
return nil
}

func (c *connection) SetWriteDeadline(t time.Time) error {
c.writeDeadline = t
return nil
}

type addr struct {
proto string
address string
}

func (a addr) Network() string {
return a.proto
}

func (a addr) String() string {
return a.address
}

type chanWriter struct {
conn *connection
C chan []byte
}

func (c chanWriter) Write(buf []byte) (int, error) {
c.conn.Lock()
defer c.conn.Unlock()

if c.conn.err != nil {
return 0, c.conn.err
}

newBuf := make([]byte, len(buf))
copy(newBuf, buf)
buf = newBuf

select {
// must copy the buffer
case c.C <- buf:
return len(buf), nil
default:
select {
case c.C <- buf:
return len(buf), nil
case <-time.After(15 * time.Second):
return 0, errors.New("backed up reader")
}
}
}
Loading

0 comments on commit cd6020a

Please sign in to comment.