Skip to content

Commit

Permalink
Add SetBufferThreshold Method
Browse files Browse the repository at this point in the history
  • Loading branch information
lxzan committed Aug 15, 2024
1 parent c96b189 commit 354b5e7
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 293 deletions.
4 changes: 1 addition & 3 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ linters:
# Disable specific linter
# https://golangci-lint.run/usage/linters/#disabled-by-default
disable:
- mnd
- testpackage
- nosnakecase
- nlreturn
- gomnd
- forcetypeassert
Expand All @@ -14,15 +14,13 @@ linters:
- ineffassign
- lll
- funlen
- scopelint
- dupl
- gofumpt
- gofmt
- godot
- gci
- goimports
- gocognit
- ifshort
- gochecknoinits
- goconst
- depguard
Expand Down
22 changes: 11 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
)

// Dialer 拨号器接口
// dialer interface
// Dialer interface
type Dialer interface {
// Dial 连接到指定网络上的地址
// connects to the address on the named network
// Connects to the address on the named network
Dial(network, addr string) (c net.Conn, err error)
}

Expand All @@ -31,7 +31,7 @@ type connector struct {
}

// NewClient 创建一个新的 WebSocket 客户端连接
// creates a new WebSocket client connection
// Creates a new WebSocket client connection
func NewClient(handler Event, option *ClientOption) (*Conn, *http.Response, error) {
option = initClientOption(option)
c := &connector{option: option, eventHandler: handler}
Expand Down Expand Up @@ -84,7 +84,7 @@ func NewClientFromConn(handler Event, option *ClientOption, conn net.Conn) (*Con
return client, resp, err
}

// request 发送HTTP请求, 即WebSocket握手
// 发送HTTP请求, 即WebSocket握手
// Sends an http request, i.e., websocket handshake
func (c *connector) request() (*http.Response, *bufio.Reader, error) {
_ = c.conn.SetDeadline(time.Now().Add(c.option.HandshakeTimeout))
Expand Down Expand Up @@ -138,7 +138,7 @@ func (c *connector) request() (*http.Response, *bufio.Reader, error) {
return resp, br, err
}

// getPermessageDeflate 获取压缩拓展结果
// 获取压缩拓展结果
// Get compression expansion results
func (c *connector) getPermessageDeflate(extensions string) PermessageDeflate {
serverPD := permessageNegotiation(extensions)
Expand All @@ -157,8 +157,8 @@ func (c *connector) getPermessageDeflate(extensions string) PermessageDeflate {
return pd
}

// handshake 执行 WebSocket 握手操作
// performs the WebSocket handshake operation
// 执行 WebSocket 握手操作
// Performs the WebSocket handshake operation
func (c *connector) handshake() (*Conn, *http.Response, error) {
resp, br, err := c.request()
if err != nil {
Expand Down Expand Up @@ -202,8 +202,8 @@ func (c *connector) handshake() (*Conn, *http.Response, error) {
return socket, resp, c.conn.SetDeadline(time.Time{})
}

// getSubProtocol 从响应中获取子协议
// retrieves the subprotocol from the response
// 从响应中获取子协议
// Retrieves the subprotocol from the response
func (c *connector) getSubProtocol(resp *http.Response) (string, error) {
a := internal.Split(c.option.RequestHeader.Get(internal.SecWebSocketProtocol.Key), ",")
b := internal.Split(resp.Header.Get(internal.SecWebSocketProtocol.Key), ",")
Expand All @@ -214,8 +214,8 @@ func (c *connector) getSubProtocol(resp *http.Response) (string, error) {
return subprotocol, nil
}

// checkHeaders 检查响应头以验证握手是否成功
// checks the response headers to verify if the handshake was successful
// 检查响应头以验证握手是否成功
// Checks the response headers to verify if the handshake was successful
func (c *connector) checkHeaders(resp *http.Response) error {
if resp.StatusCode != http.StatusSwitchingProtocols {
return ErrHandshake
Expand Down
46 changes: 23 additions & 23 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/lxzan/gws/internal"
)

// flateTail deflate压缩算法的尾部标记
// the tail marker of the deflate compression algorithm
// deflate压缩算法的尾部标记
// The tail marker of the deflate compression algorithm
var flateTail = []byte{0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff}

type deflaterPool struct {
Expand All @@ -24,8 +24,8 @@ type deflaterPool struct {
pool []*deflater
}

// initialize 初始化deflaterPool
// initialize the deflaterPool
// 初始化deflaterPool
// Initialize the deflaterPool
func (c *deflaterPool) initialize(options PermessageDeflate, limit int) *deflaterPool {
c.num = uint64(options.PoolSize)
for i := uint64(0); i < c.num; i++ {
Expand All @@ -35,7 +35,7 @@ func (c *deflaterPool) initialize(options PermessageDeflate, limit int) *deflate
}

// Select 从deflaterPool中选择一个deflater对象
// select a deflater object from the deflaterPool
// Select a deflater object from the deflaterPool
func (c *deflaterPool) Select() *deflater {
var j = atomic.AddUint64(&c.serial, 1) & (c.num - 1)
return c.pool[j]
Expand All @@ -51,8 +51,8 @@ type deflater struct {
cpsWriter *flate.Writer
}

// initialize 初始化deflater
// initialize the deflater
// 初始化deflater
// Initialize the deflater
func (c *deflater) initialize(isServer bool, options PermessageDeflate, limit int) *deflater {
c.dpsReader = flate.NewReader(nil)
c.dpsBuffer = bytes.NewBuffer(nil)
Expand All @@ -67,19 +67,19 @@ func (c *deflater) initialize(isServer bool, options PermessageDeflate, limit in
return c
}

// resetFR 重置deflate reader
// reset the deflate reader
// 重置deflate reader
// Reset the deflate reader
func (c *deflater) resetFR(r io.Reader, dict []byte) {
resetter := c.dpsReader.(flate.Resetter)
_ = resetter.Reset(r, dict) // must return a null pointer
if c.dpsBuffer.Cap() > 256*1024 {
if c.dpsBuffer.Cap() > int(bufferThreshold) {
c.dpsBuffer = bytes.NewBuffer(nil)
}
c.dpsBuffer.Reset()
}

// Decompress 解压
// decompress data
// Decompress data
func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, error) {
c.dpsLocker.Lock()
defer c.dpsLocker.Unlock()
Expand All @@ -96,7 +96,7 @@ func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, er
}

// Compress 压缩
// compress data
// Compress data
func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte) error {
c.cpsLocker.Lock()
defer c.cpsLocker.Unlock()
Expand All @@ -117,16 +117,16 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte
return nil
}

// slideWindow 滑动窗口
// sliding window
// 滑动窗口
// Sliding window
type slideWindow struct {
enabled bool
dict []byte
size int
}

// initialize 初始化滑动窗口
// initialize the sliding window
// 初始化滑动窗口
// Initialize the sliding window
func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *slideWindow {
c.enabled = true
c.size = internal.BinaryPow(windowBits)
Expand All @@ -139,7 +139,7 @@ func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *s
}

// Write 将数据写入滑动窗口
// write data to the sliding window
// Write data to the sliding window
func (c *slideWindow) Write(p []byte) (int, error) {
if !c.enabled {
return 0, nil
Expand Down Expand Up @@ -169,8 +169,8 @@ func (c *slideWindow) Write(p []byte) (int, error) {
return total, nil
}

// genRequestHeader 生成请求头
// generate request headers
// 生成请求头
// Generate request headers
func (c *PermessageDeflate) genRequestHeader() string {
var options = make([]string, 0, 5)
options = append(options, internal.PermessageDeflate)
Expand All @@ -191,8 +191,8 @@ func (c *PermessageDeflate) genRequestHeader() string {
return strings.Join(options, "; ")
}

// genResponseHeader 生成响应头
// generate response headers
// 生成响应头
// Generate response headers
func (c *PermessageDeflate) genResponseHeader() string {
var options = make([]string, 0, 5)
options = append(options, internal.PermessageDeflate)
Expand All @@ -211,7 +211,7 @@ func (c *PermessageDeflate) genResponseHeader() string {
return strings.Join(options, "; ")
}

// permessageNegotiation 压缩拓展协商
// 压缩拓展协商
// Negotiation of compression parameters
func permessageNegotiation(str string) PermessageDeflate {
var options = PermessageDeflate{
Expand Down Expand Up @@ -250,7 +250,7 @@ func permessageNegotiation(str string) PermessageDeflate {
return options
}

// limitReader 限制从io.Reader中最多读取m个字节
// 限制从io.Reader中最多读取m个字节
// Limit reading up to m bytes from io.Reader
func limitReader(r io.Reader, m int) io.Reader { return &limitedReader{R: r, M: m} }

Expand Down
10 changes: 5 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/lxzan/gws/internal"
)

// Conn 结构体表示一个 WebSocket 连接
// Conn struct represents a WebSocket connection
// Conn WebSocket连接
// WebSocket connection
type Conn struct {
// 互斥锁,用于保护共享资源
// Mutex to protect shared resources
Expand Down Expand Up @@ -310,9 +310,9 @@ func (c *Conn) NetConn() net.Conn {
return c.conn
}

// SetNoDelay
// 控制操作系统是否应该延迟数据包传输以期望发送更少的数据包(Nagle 算法)。
// 默认值是 true(无延迟),这意味着数据在 Write 之后尽快发送
// SetNoDelay 设置无延迟
// 控制操作系统是否应该延迟数据包传输以期望发送更少的数据包(Nagle算法).
// 默认值是 true(无延迟),这意味着数据在 Write 之后尽快发送.
// Controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm).
// The default is true (no delay), meaning that data is sent as soon as possible after a Write.
func (c *Conn) SetNoDelay(noDelay bool) error {
Expand Down
18 changes: 15 additions & 3 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@ package gws
import "github.com/lxzan/gws/internal"

var (
framePadding = frameHeader{} // 帧头填充物
binaryPool = internal.NewBufferPool(128, 256*1024) // 内存池
defaultLogger = new(stdLogger) // 默认日志工具
framePadding = frameHeader{} // 帧头填充物
defaultLogger = new(stdLogger) // 默认日志工具
bufferThreshold = uint32(256 * 1024) // buffer阈值
binaryPool = new(internal.BufferPool) // 内存池
)

func init() {
SetBufferThreshold(bufferThreshold)
}

// SetBufferThreshold 设置buffer阈值, x=pow(2,n), 超过x个字节的buffer不会被回收
// Set the buffer threshold, x=pow(2,n), that buffers larger than x bytes are not reclaimed.
func SetBufferThreshold(x uint32) {
bufferThreshold = internal.ToBinaryNumber(x)
binaryPool = internal.NewBufferPool(128, bufferThreshold)
}
72 changes: 14 additions & 58 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,65 +3,21 @@ package internal
// closeErrorMap 将状态码映射到错误信息
// map status codes to error messages
var closeErrorMap = map[StatusCode]string{
// 空状态码
// Empty status code
0: "empty code",

// 正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
// Normal closure; the connection was closed successfully for a purpose.
CloseNormalClosure: "close normal",

// 终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.
// The terminal is leaving, possibly due to a server error, or because the browser is leaving the page with an open connection.
CloseGoingAway: "client going away",

// 由于协议错误而中断连接.
// The connection was terminated due to a protocol error.
CloseProtocolError: "protocol error",

// 由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).
// The connection was terminated due to receiving an unsupported data type (e.g., a terminal that only receives text data received binary data).
CloseUnsupported: "unsupported data",

// 表示没有收到预期的状态码.
// Indicates that the expected status code was not received.
CloseNoStatusReceived: "no status",

// 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).
// Used when the connection is abnormally closed when expecting a status code (i.e., no close frame was sent).
CloseAbnormalClosure: "abnormal closure",

// 由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
// The connection was terminated due to receiving data in an incorrect format (e.g., non-UTF-8 data in a text message).
CloseUnsupportedData: "invalid payload data",

// 由于违反策略而断开连接.
// The connection was terminated due to a policy violation.
ClosePolicyViolation: "policy violation",

// 由于消息过大而断开连接.
// The connection was terminated because the message was too large.
CloseMessageTooLarge: "message too large",

// 由于缺少必要的扩展而断开连接.
// The connection was terminated due to a mandatory extension missing.
CloseMissingExtension: "mandatory extension missing",

// 由于内部服务器错误而断开连接.
// The connection was terminated due to an internal server error.
0: "empty code",
CloseNormalClosure: "close normal",
CloseGoingAway: "client going away",
CloseProtocolError: "protocol error",
CloseUnsupported: "unsupported data",
CloseNoStatusReceived: "no status",
CloseAbnormalClosure: "abnormal closure",
CloseUnsupportedData: "invalid payload data",
ClosePolicyViolation: "policy violation",
CloseMessageTooLarge: "message too large",
CloseMissingExtension: "mandatory extension missing",
CloseInternalServerErr: "internal server error",

// 由于服务器重启而断开连接.
// The connection was terminated because the server is restarting.
CloseServiceRestart: "server restarting",

// 由于服务器过载或其他原因, 建议客户端稍后重试.
// The connection was terminated due to server overload or other reasons, suggesting the client try again later.
CloseTryAgainLater: "try again later",

// 由于 TLS 握手失败而断开连接.
// The connection was terminated due to a TLS handshake failure.
CloseTLSHandshake: "TLS handshake error",
CloseServiceRestart: "server restarting",
CloseTryAgainLater: "try again later",
CloseTLSHandshake: "TLS handshake error",
}

// StatusCode WebSocket错误码
Expand Down
1 change: 1 addition & 0 deletions internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func ToBinaryNumber[T Integer](n T) T {
return x
}

// BinaryPow 返回2的n次方
func BinaryPow(n int) int {
var ans = 1
for i := 0; i < n; i++ {
Expand Down
Loading

0 comments on commit 354b5e7

Please sign in to comment.