From f55a4126763b277e4866500d1f80bf526c783ced Mon Sep 17 00:00:00 2001 From: lxzan Date: Thu, 15 Aug 2024 15:29:42 +0800 Subject: [PATCH] Add SetBufferThreshold Method --- .golangci.yaml | 4 +- client.go | 22 +++---- compress.go | 46 ++++++------- conn.go | 28 ++++---- init.go | 18 +++++- internal/error.go | 72 ++++----------------- internal/utils.go | 1 + option.go | 12 ++-- session_storage.go | 157 ++++++++++++--------------------------------- task.go | 18 +++--- types.go | 48 +++++++------- upgrader.go | 46 ++++++------- writer.go | 22 +++---- 13 files changed, 192 insertions(+), 302 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index ca8df7c0..8b56cc76 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -3,8 +3,8 @@ linters: # Disable specific linter # https://golangci-lint.run/usage/linters/#disabled-by-default disable: + - mnd - testpackage - - nosnakecase - nlreturn - gomnd - forcetypeassert @@ -14,7 +14,6 @@ linters: - ineffassign - lll - funlen - - scopelint - dupl - gofumpt - gofmt @@ -22,7 +21,6 @@ linters: - gci - goimports - gocognit - - ifshort - gochecknoinits - goconst - depguard diff --git a/client.go b/client.go index 606af882..6c59ac20 100644 --- a/client.go +++ b/client.go @@ -16,10 +16,10 @@ import ( ) // Dialer 拨号器接口 -// dialer interface +// Dialer interface type Dialer interface { // Dial 连接到指定网络上的地址 - // connects to the address on the named network + // Connects to the address on the named network Dial(network, addr string) (c net.Conn, err error) } @@ -31,7 +31,7 @@ type connector struct { } // NewClient 创建一个新的 WebSocket 客户端连接 -// creates a new WebSocket client connection +// Creates a new WebSocket client connection func NewClient(handler Event, option *ClientOption) (*Conn, *http.Response, error) { option = initClientOption(option) c := &connector{option: option, eventHandler: handler} @@ -84,7 +84,7 @@ func NewClientFromConn(handler Event, option *ClientOption, conn net.Conn) (*Con return client, resp, err } -// request 发送HTTP请求, 即WebSocket握手 +// 发送HTTP请求, 即WebSocket握手 // Sends an http request, i.e., websocket handshake func (c *connector) request() (*http.Response, *bufio.Reader, error) { _ = c.conn.SetDeadline(time.Now().Add(c.option.HandshakeTimeout)) @@ -138,7 +138,7 @@ func (c *connector) request() (*http.Response, *bufio.Reader, error) { return resp, br, err } -// getPermessageDeflate 获取压缩拓展结果 +// 获取压缩拓展结果 // Get compression expansion results func (c *connector) getPermessageDeflate(extensions string) PermessageDeflate { serverPD := permessageNegotiation(extensions) @@ -157,8 +157,8 @@ func (c *connector) getPermessageDeflate(extensions string) PermessageDeflate { return pd } -// handshake 执行 WebSocket 握手操作 -// performs the WebSocket handshake operation +// 执行 WebSocket 握手操作 +// Performs the WebSocket handshake operation func (c *connector) handshake() (*Conn, *http.Response, error) { resp, br, err := c.request() if err != nil { @@ -202,8 +202,8 @@ func (c *connector) handshake() (*Conn, *http.Response, error) { return socket, resp, c.conn.SetDeadline(time.Time{}) } -// getSubProtocol 从响应中获取子协议 -// retrieves the subprotocol from the response +// 从响应中获取子协议 +// Retrieves the subprotocol from the response func (c *connector) getSubProtocol(resp *http.Response) (string, error) { a := internal.Split(c.option.RequestHeader.Get(internal.SecWebSocketProtocol.Key), ",") b := internal.Split(resp.Header.Get(internal.SecWebSocketProtocol.Key), ",") @@ -214,8 +214,8 @@ func (c *connector) getSubProtocol(resp *http.Response) (string, error) { return subprotocol, nil } -// checkHeaders 检查响应头以验证握手是否成功 -// checks the response headers to verify if the handshake was successful +// 检查响应头以验证握手是否成功 +// Checks the response headers to verify if the handshake was successful func (c *connector) checkHeaders(resp *http.Response) error { if resp.StatusCode != http.StatusSwitchingProtocols { return ErrHandshake diff --git a/compress.go b/compress.go index 1a4a73e2..77ae64bb 100644 --- a/compress.go +++ b/compress.go @@ -14,8 +14,8 @@ import ( "github.com/lxzan/gws/internal" ) -// flateTail deflate压缩算法的尾部标记 -// the tail marker of the deflate compression algorithm +// deflate压缩算法的尾部标记 +// The tail marker of the deflate compression algorithm var flateTail = []byte{0x00, 0x00, 0xff, 0xff, 0x01, 0x00, 0x00, 0xff, 0xff} type deflaterPool struct { @@ -24,8 +24,8 @@ type deflaterPool struct { pool []*deflater } -// initialize 初始化deflaterPool -// initialize the deflaterPool +// 初始化deflaterPool +// Initialize the deflaterPool func (c *deflaterPool) initialize(options PermessageDeflate, limit int) *deflaterPool { c.num = uint64(options.PoolSize) for i := uint64(0); i < c.num; i++ { @@ -35,7 +35,7 @@ func (c *deflaterPool) initialize(options PermessageDeflate, limit int) *deflate } // Select 从deflaterPool中选择一个deflater对象 -// select a deflater object from the deflaterPool +// Select a deflater object from the deflaterPool func (c *deflaterPool) Select() *deflater { var j = atomic.AddUint64(&c.serial, 1) & (c.num - 1) return c.pool[j] @@ -51,8 +51,8 @@ type deflater struct { cpsWriter *flate.Writer } -// initialize 初始化deflater -// initialize the deflater +// 初始化deflater +// Initialize the deflater func (c *deflater) initialize(isServer bool, options PermessageDeflate, limit int) *deflater { c.dpsReader = flate.NewReader(nil) c.dpsBuffer = bytes.NewBuffer(nil) @@ -67,19 +67,19 @@ func (c *deflater) initialize(isServer bool, options PermessageDeflate, limit in return c } -// resetFR 重置deflate reader -// reset the deflate reader +// 重置deflate reader +// Reset the deflate reader func (c *deflater) resetFR(r io.Reader, dict []byte) { resetter := c.dpsReader.(flate.Resetter) _ = resetter.Reset(r, dict) // must return a null pointer - if c.dpsBuffer.Cap() > 256*1024 { + if c.dpsBuffer.Cap() > int(bufferThreshold) { c.dpsBuffer = bytes.NewBuffer(nil) } c.dpsBuffer.Reset() } // Decompress 解压 -// decompress data +// Decompress data func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, error) { c.dpsLocker.Lock() defer c.dpsLocker.Unlock() @@ -96,7 +96,7 @@ func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, er } // Compress 压缩 -// compress data +// Compress data func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte) error { c.cpsLocker.Lock() defer c.cpsLocker.Unlock() @@ -117,16 +117,16 @@ func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte return nil } -// slideWindow 滑动窗口 -// sliding window +// 滑动窗口 +// Sliding window type slideWindow struct { enabled bool dict []byte size int } -// initialize 初始化滑动窗口 -// initialize the sliding window +// 初始化滑动窗口 +// Initialize the sliding window func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *slideWindow { c.enabled = true c.size = internal.BinaryPow(windowBits) @@ -139,7 +139,7 @@ func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *s } // Write 将数据写入滑动窗口 -// write data to the sliding window +// Write data to the sliding window func (c *slideWindow) Write(p []byte) (int, error) { if !c.enabled { return 0, nil @@ -169,8 +169,8 @@ func (c *slideWindow) Write(p []byte) (int, error) { return total, nil } -// genRequestHeader 生成请求头 -// generate request headers +// 生成请求头 +// Generate request headers func (c *PermessageDeflate) genRequestHeader() string { var options = make([]string, 0, 5) options = append(options, internal.PermessageDeflate) @@ -191,8 +191,8 @@ func (c *PermessageDeflate) genRequestHeader() string { return strings.Join(options, "; ") } -// genResponseHeader 生成响应头 -// generate response headers +// 生成响应头 +// Generate response headers func (c *PermessageDeflate) genResponseHeader() string { var options = make([]string, 0, 5) options = append(options, internal.PermessageDeflate) @@ -211,7 +211,7 @@ func (c *PermessageDeflate) genResponseHeader() string { return strings.Join(options, "; ") } -// permessageNegotiation 压缩拓展协商 +// 压缩拓展协商 // Negotiation of compression parameters func permessageNegotiation(str string) PermessageDeflate { var options = PermessageDeflate{ @@ -250,7 +250,7 @@ func permessageNegotiation(str string) PermessageDeflate { return options } -// limitReader 限制从io.Reader中最多读取m个字节 +// 限制从io.Reader中最多读取m个字节 // Limit reading up to m bytes from io.Reader func limitReader(r io.Reader, m int) io.Reader { return &limitedReader{R: r, M: m} } diff --git a/conn.go b/conn.go index 77087013..60842db4 100644 --- a/conn.go +++ b/conn.go @@ -13,8 +13,8 @@ import ( "github.com/lxzan/gws/internal" ) -// Conn 结构体表示一个 WebSocket 连接 -// Conn struct represents a WebSocket connection +// Conn WebSocket连接 +// WebSocket connection type Conn struct { // 互斥锁,用于保护共享资源 // Mutex to protect shared resources @@ -26,7 +26,7 @@ type Conn struct { // 用于存储错误的原子值 // Atomic value for storing errors - err atomic.Value + ev atomic.Value // 标识是否为服务器端 // Indicates if this is a server-side connection @@ -76,16 +76,16 @@ type Conn struct { // Deflater deflater *deflater - // 数据包发送窗口 - // Data packet send window + // 解压字典滑动窗口 + // Decompressing dictionary sliding window dpsWindow slideWindow - // 数据包接收窗口 - // Data packet receive window + // 压缩字典滑动窗口 + // Compressed dictionary sliding window cpsWindow slideWindow - // 每消息压缩 - // Per-message deflate + // 压缩拓展配置 + // Compression extension configuration pd PermessageDeflate } @@ -105,7 +105,7 @@ func (c *Conn) ReadLoop() { } } - err, ok := c.err.Load().(error) + err, ok := c.ev.Load().(error) c.handler.OnClose(c, internal.SelectValue(ok, err, errEmpty)) // 回收资源 @@ -185,7 +185,7 @@ func (c *Conn) isClosed() bool { // 关闭连接并存储错误信息 // Closes the connection and stores the error information func (c *Conn) close(reason []byte, err error) { - c.err.Store(err) + c.ev.Store(err) _ = c.doWrite(OpcodeCloseConnection, internal.Bytes(reason)) _ = c.conn.Close() } @@ -310,9 +310,9 @@ func (c *Conn) NetConn() net.Conn { return c.conn } -// SetNoDelay -// 控制操作系统是否应该延迟数据包传输以期望发送更少的数据包(Nagle 算法)。 -// 默认值是 true(无延迟),这意味着数据在 Write 之后尽快发送。 +// SetNoDelay 设置无延迟 +// 控制操作系统是否应该延迟数据包传输以期望发送更少的数据包(Nagle算法). +// 默认值是 true(无延迟),这意味着数据在 Write 之后尽快发送. // Controls whether the operating system should delay packet transmission in hopes of sending fewer packets (Nagle's algorithm). // The default is true (no delay), meaning that data is sent as soon as possible after a Write. func (c *Conn) SetNoDelay(noDelay bool) error { diff --git a/init.go b/init.go index a47b9963..ba8ac29d 100644 --- a/init.go +++ b/init.go @@ -3,7 +3,19 @@ package gws import "github.com/lxzan/gws/internal" var ( - framePadding = frameHeader{} // 帧头填充物 - binaryPool = internal.NewBufferPool(128, 256*1024) // 内存池 - defaultLogger = new(stdLogger) // 默认日志工具 + framePadding = frameHeader{} // 帧头填充物 + defaultLogger = new(stdLogger) // 默认日志工具 + bufferThreshold = uint32(256 * 1024) // buffer阈值 + binaryPool = new(internal.BufferPool) // 内存池 ) + +func init() { + SetBufferThreshold(bufferThreshold) +} + +// SetBufferThreshold 设置buffer阈值, x=pow(2,n), 超过x个字节的buffer不会被回收 +// Set the buffer threshold, x=pow(2,n), that buffers larger than x bytes are not reclaimed. +func SetBufferThreshold(x uint32) { + bufferThreshold = internal.ToBinaryNumber(x) + binaryPool = internal.NewBufferPool(128, bufferThreshold) +} diff --git a/internal/error.go b/internal/error.go index fcfa9a89..518fd023 100644 --- a/internal/error.go +++ b/internal/error.go @@ -3,65 +3,21 @@ package internal // closeErrorMap 将状态码映射到错误信息 // map status codes to error messages var closeErrorMap = map[StatusCode]string{ - // 空状态码 - // Empty status code - 0: "empty code", - - // 正常关闭; 无论为何目的而创建, 该链接都已成功完成任务. - // Normal closure; the connection was closed successfully for a purpose. - CloseNormalClosure: "close normal", - - // 终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开. - // The terminal is leaving, possibly due to a server error, or because the browser is leaving the page with an open connection. - CloseGoingAway: "client going away", - - // 由于协议错误而中断连接. - // The connection was terminated due to a protocol error. - CloseProtocolError: "protocol error", - - // 由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据). - // The connection was terminated due to receiving an unsupported data type (e.g., a terminal that only receives text data received binary data). - CloseUnsupported: "unsupported data", - - // 表示没有收到预期的状态码. - // Indicates that the expected status code was not received. - CloseNoStatusReceived: "no status", - - // 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧). - // Used when the connection is abnormally closed when expecting a status code (i.e., no close frame was sent). - CloseAbnormalClosure: "abnormal closure", - - // 由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据). - // The connection was terminated due to receiving data in an incorrect format (e.g., non-UTF-8 data in a text message). - CloseUnsupportedData: "invalid payload data", - - // 由于违反策略而断开连接. - // The connection was terminated due to a policy violation. - ClosePolicyViolation: "policy violation", - - // 由于消息过大而断开连接. - // The connection was terminated because the message was too large. - CloseMessageTooLarge: "message too large", - - // 由于缺少必要的扩展而断开连接. - // The connection was terminated due to a mandatory extension missing. - CloseMissingExtension: "mandatory extension missing", - - // 由于内部服务器错误而断开连接. - // The connection was terminated due to an internal server error. + 0: "empty code", + CloseNormalClosure: "close normal", + CloseGoingAway: "client going away", + CloseProtocolError: "protocol error", + CloseUnsupported: "unsupported data", + CloseNoStatusReceived: "no status", + CloseAbnormalClosure: "abnormal closure", + CloseUnsupportedData: "invalid payload data", + ClosePolicyViolation: "policy violation", + CloseMessageTooLarge: "message too large", + CloseMissingExtension: "mandatory extension missing", CloseInternalServerErr: "internal server error", - - // 由于服务器重启而断开连接. - // The connection was terminated because the server is restarting. - CloseServiceRestart: "server restarting", - - // 由于服务器过载或其他原因, 建议客户端稍后重试. - // The connection was terminated due to server overload or other reasons, suggesting the client try again later. - CloseTryAgainLater: "try again later", - - // 由于 TLS 握手失败而断开连接. - // The connection was terminated due to a TLS handshake failure. - CloseTLSHandshake: "TLS handshake error", + CloseServiceRestart: "server restarting", + CloseTryAgainLater: "try again later", + CloseTLSHandshake: "TLS handshake error", } // StatusCode WebSocket错误码 diff --git a/internal/utils.go b/internal/utils.go index 06839e98..bff0fde2 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -184,6 +184,7 @@ func ToBinaryNumber[T Integer](n T) T { return x } +// BinaryPow 返回2的n次方 func BinaryPow(n int) int { var ans = 1 for i := 0; i < n; i++ { diff --git a/option.go b/option.go index 2696b27e..374b5ca4 100644 --- a/option.go +++ b/option.go @@ -150,7 +150,7 @@ type ( } // ServerOption 服务端配置 - // server configurations + // Server configurations ServerOption struct { // 配置 // Configuration @@ -225,7 +225,7 @@ type ( // 设置压缩阈值 // 开启上下文接管时, 必须不论长短压缩全部消息, 否则浏览器会报错 -// when context takeover is enabled, all messages must be compressed regardless of length, +// When context takeover is enabled, all messages must be compressed regardless of length, // otherwise the browser will report an error. func (c *PermessageDeflate) setThreshold(isServer bool) { if (isServer && c.ServerContextTakeover) || (!isServer && c.ClientContextTakeover) { @@ -233,8 +233,8 @@ func (c *PermessageDeflate) setThreshold(isServer bool) { } } -// deleteProtectedHeaders 删除受保护的 WebSocket 头部字段 -// removes protected WebSocket header fields +// 删除受保护的 WebSocket 头部字段 +// Removes protected WebSocket header fields func (c *ServerOption) deleteProtectedHeaders() { c.ResponseHeader.Del(internal.Upgrade.Key) c.ResponseHeader.Del(internal.Connection.Key) @@ -342,7 +342,7 @@ func initServerOption(c *ServerOption) *ServerOption { func (c *ServerOption) getConfig() *Config { return c.config } // ClientOption 客户端配置 -// client configurations +// Client configurations type ClientOption struct { // 写缓冲区的大小, v1.4.5版本此参数被废弃 // Deprecated: Size of the write buffer, v1.4.5 version of this parameter is deprecated @@ -473,7 +473,7 @@ func initClientOption(c *ClientOption) *ClientOption { } // 将 ClientOption 的配置转换为 Config 并返回 -// converts the ClientOption configuration to Config and returns it +// Converts the ClientOption configuration to Config and returns it func (c *ClientOption) getConfig() *Config { config := &Config{ ParallelEnabled: c.ParallelEnabled, diff --git a/session_storage.go b/session_storage.go index 992cc627..fd8c20da 100644 --- a/session_storage.go +++ b/session_storage.go @@ -7,45 +7,45 @@ import ( "github.com/lxzan/gws/internal" ) -// SessionStorage 接口定义了会话存储的基本操作 -// The SessionStorage interface defines basic operations for session storage +// SessionStorage 会话存储 type SessionStorage interface { // Len 返回存储中的键值对数量 - // Len returns the number of key-value pairs in the storage + // Returns the number of key-value pairs in the storage Len() int // Load 根据键获取值,如果键存在则返回值和 true,否则返回 nil 和 false - // Load retrieves the value for a given key. If the key exists, it returns the value and true; otherwise, it returns nil and false + // retrieves the value for a given key. If the key exists, it returns the value and true; otherwise, it returns nil and false Load(key string) (value any, exist bool) // Delete 根据键删除存储中的键值对 - // Delete removes the key-value pair from the storage for a given key + // removes the key-value pair from the storage for a given key Delete(key string) // Store 存储键值对 - // Store saves the key-value pair in the storage + // saves the key-value pair in the storage Store(key string, value any) - // Range 遍历存储中的所有键值对,并对每个键值对执行给定的函数 - // Range iterates over all key-value pairs in the storage and executes the given function for each pair + // Range 遍历 + // 如果函数返回 false,遍历将提前终止. + // If the function returns false, the iteration stops early. Range(f func(key string, value any) bool) } // newSmap 创建并返回一个新的 smap 实例 -// newSmap creates and returns a new smap instance +// creates and returns a new smap instance func newSmap() *smap { return &smap{data: make(map[string]any)} } -// smap 是一个基于 map 的会话存储实现 -// smap is a map-based implementation of the session storage +// smap 基于 map 的会话存储实现 +// map-based implementation of the session storage type smap struct { sync.Mutex data map[string]any } // Len 返回存储中的键值对数量 -// Len returns the number of key-value pairs in the storage +// returns the number of key-value pairs in the storage func (c *smap) Len() int { c.Lock() defer c.Unlock() @@ -53,7 +53,7 @@ func (c *smap) Len() int { } // Load 根据键获取值,如果键存在则返回值和 true,否则返回 nil 和 false -// Load retrieves the value for a given key. If the key exists, it returns the value and true; otherwise, it returns nil and false +// retrieves the value for a given key. If the key exists, it returns the value and true; otherwise, it returns nil and false func (c *smap) Load(key string) (value any, exist bool) { c.Lock() defer c.Unlock() @@ -62,7 +62,7 @@ func (c *smap) Load(key string) (value any, exist bool) { } // Delete 根据键删除存储中的键值对 -// Delete removes the key-value pair from the storage for a given key +// removes the key-value pair from the storage for a given key func (c *smap) Delete(key string) { c.Lock() defer c.Unlock() @@ -70,15 +70,14 @@ func (c *smap) Delete(key string) { } // Store 存储键值对 -// Store saves the key-value pair in the storage +// saves the key-value pair in the storage func (c *smap) Store(key string, value any) { c.Lock() defer c.Unlock() c.data[key] = value } -// Range 遍历存储中的所有键值对,并对每个键值对执行给定的函数 -// Range iterates over all key-value pairs in the storage and executes the given function for each pair +// Range 遍历 func (c *smap) Range(f func(key string, value any) bool) { c.Lock() defer c.Unlock() @@ -91,74 +90,49 @@ func (c *smap) Range(f func(key string, value any) bool) { } type ( - // ConcurrentMap 是一个并发安全的映射结构 - // ConcurrentMap is a concurrency-safe map structure + // ConcurrentMap 并发安全的映射结构 + // concurrency-safe map structure ConcurrentMap[K comparable, V any] struct { // hasher 用于计算键的哈希值 - // hasher is used to compute the hash value of keys + // compute the hash value of keys hasher maphash.Hasher[K] // num 表示分片的数量 - // num represents the number of shardings + // represents the number of shardings num uint64 // shardings 存储实际的分片映射 - // shardings stores the actual sharding maps + // stores the actual sharding maps shardings []*Map[K, V] } ) // NewConcurrentMap 创建一个新的并发安全映射 -// NewConcurrentMap creates a new concurrency-safe map +// creates a new concurrency-safe map // arg0 表示分片的数量;arg1 表示分片的初始化容量 // arg0 represents the number of shardings; arg1 represents the initialized capacity of a sharding. func NewConcurrentMap[K comparable, V any](size ...uint64) *ConcurrentMap[K, V] { - // 确保 size 至少有两个元素,默认值为 0 - // Ensure size has at least two elements, defaulting to 0 size = append(size, 0, 0) - - // 获取分片数量和初始化容量 - // Get the number of shardings and the initial capacity num, capacity := size[0], size[1] - - // 将分片数量调整为二进制数 - // Adjust the number of shardings to a binary number num = internal.ToBinaryNumber(internal.SelectValue(num <= 0, 16, num)) - - // 创建 ConcurrentMap 实例 - // Create a ConcurrentMap instance var cm = &ConcurrentMap[K, V]{ hasher: maphash.NewHasher[K](), num: num, shardings: make([]*Map[K, V], num), } - - // 初始化每个分片 - // Initialize each sharding - for i := range cm.shardings { + for i, _ := range cm.shardings { cm.shardings[i] = NewMap[K, V](int(capacity)) } - - // 返回创建的 ConcurrentMap 实例 - // Return the created ConcurrentMap instance return cm } // GetSharding 返回一个键的分片映射 -// GetSharding returns a map sharding for a key +// returns a map sharding for a key // 分片中的操作是无锁的,需要手动加锁 // The operations inside the sharding are lockless and need to be locked manually. func (c *ConcurrentMap[K, V]) GetSharding(key K) *Map[K, V] { - // 计算键的哈希值 - // Calculate the hash code for the key var hashCode = c.hasher.Hash(key) - - // 计算分片索引 - // Calculate the sharding index var index = hashCode & (c.num - 1) - - // 返回对应索引的分片 - // Return the sharding at the calculated index return c.shardings[index] } @@ -166,84 +140,53 @@ func (c *ConcurrentMap[K, V]) GetSharding(key K) *Map[K, V] { // Len returns the number of elements in the map func (c *ConcurrentMap[K, V]) Len() int { var length = 0 - - // 遍历所有分片并累加它们的长度 - // Iterate over all shardings and sum their lengths for _, b := range c.shardings { b.Lock() length += b.Len() b.Unlock() } - - // 返回总长度 - // Return the total length return length } // Load 返回映射中键对应的值,如果不存在则返回 nil -// Load returns the value stored in the map for a key, or nil if no value is present +// returns the value stored in the map for a key, or nil if no value is present // ok 结果表示是否在映射中找到了值 // The ok result indicates whether the value was found in the map func (c *ConcurrentMap[K, V]) Load(key K) (value V, ok bool) { - // 获取键对应的分片 - // Get the sharding for the key var b = c.GetSharding(key) - - // 加锁并加载值 - // Lock and load the value b.Lock() value, ok = b.Load(key) b.Unlock() - - // 返回值和状态 - // Return the value and status return } // Delete 删除键对应的值 // Delete deletes the value for a key func (c *ConcurrentMap[K, V]) Delete(key K) { - // 获取键对应的分片 - // Get the sharding for the key var b = c.GetSharding(key) - - // 加锁并删除值 - // Lock and delete the value b.Lock() b.Delete(key) b.Unlock() } // Store 设置键对应的值 -// Store sets the value for a key +// sets the value for a key func (c *ConcurrentMap[K, V]) Store(key K, value V) { - // 获取键对应的分片 - // Get the sharding for the key var b = c.GetSharding(key) - - // 加锁并存储值 - // Lock and store the value b.Lock() b.Store(key, value) b.Unlock() } -// Range 依次为映射中的每个键和值调用 f -// Range calls f sequentially for each key and value present in the map +// Range 遍历 // 如果 f 返回 false,遍历停止 // If f returns false, range stops the iteration func (c *ConcurrentMap[K, V]) Range(f func(key K, value V) bool) { var next = true - - // 包装回调函数以检查是否继续 - // Wrap the callback function to check whether to continue var cb = func(k K, v V) bool { next = f(k, v) return next } - - // 遍历所有分片并调用回调函数 - // Iterate over all shardings and call the callback function for i := uint64(0); i < c.num && next; i++ { var b = c.shardings[i] b.Lock() @@ -252,74 +195,54 @@ func (c *ConcurrentMap[K, V]) Range(f func(key K, value V) bool) { } } -// Map 是一个线程安全的泛型映射类型。 -// Map is a thread-safe generic map type. +// Map 线程安全的泛型映射类型. +// thread-safe generic map type. type Map[K comparable, V any] struct { - // Mutex 用于确保并发访问的安全性 - // Mutex is used to ensure safety for concurrent access sync.Mutex - - // m 是实际存储键值对的底层映射 - // m is the underlying map that stores key-value pairs m map[K]V } -// NewMap 创建一个新的 Map 实例。 -// NewMap creates a new instance of Map. -// size 参数用于指定初始容量,如果未提供则默认为 0。 +// NewMap 创建一个新的 Map 实例 +// creates a new instance of Map +// size 参数用于指定初始容量,如果未提供则默认为 0 // The size parameter is used to specify the initial capacity, defaulting to 0 if not provided. func NewMap[K comparable, V any](size ...int) *Map[K, V] { - // 初始化容量为 0 - // Initialize capacity to 0 var capacity = 0 - - // 如果提供了 size 参数,则使用第一个值作为容量 - // If the size parameter is provided, use the first value as the capacity if len(size) > 0 { capacity = size[0] } - - // 创建一个新的 Map 实例 - // Create a new instance of Map c := new(Map[K, V]) - - // 初始化底层映射,使用指定的容量 - // Initialize the underlying map with the specified capacity c.m = make(map[K]V, capacity) - - // 返回创建的 Map 实例 - // Return the created Map instance return c } -// Len 返回 Map 中的元素数量。 +// Len 返回 Map 中的元素数量. // Len returns the number of elements in the Map. func (c *Map[K, V]) Len() int { return len(c.m) } -// Load 从 Map 中加载指定键的值。 +// Load 从 Map 中加载指定键的值. // Load loads the value for the specified key from the Map. func (c *Map[K, V]) Load(key K) (value V, ok bool) { value, ok = c.m[key] return } -// Delete 从 Map 中删除指定键的值。 -// Delete deletes the value for the specified key from the Map. +// Delete 从 Map 中删除指定键的值. +// deletes the value for the specified key from the Map. func (c *Map[K, V]) Delete(key K) { delete(c.m, key) } -// Store 将指定键值对存储到 Map 中。 -// Store stores the specified key-value pair in the Map. +// Store 将指定键值对存储到 Map 中. +// stores the specified key-value pair in the Map. func (c *Map[K, V]) Store(key K, value V) { c.m[key] = value } -// Range 遍历 Map 中的所有键值对,并对每个键值对执行指定的函数。 -// 如果函数返回 false,遍历将提前终止。 -// Range iterates over all key-value pairs in the Map and executes the specified function for each pair. +// Range 遍历 +// 如果函数返回 false,遍历将提前终止. // If the function returns false, the iteration stops early. func (c *Map[K, V]) Range(f func(K, V) bool) { for k, v := range c.m { diff --git a/task.go b/task.go index f4159269..f97a10ad 100644 --- a/task.go +++ b/task.go @@ -7,8 +7,8 @@ import ( ) type ( - // workerQueue 任务队列 - // task queue + // 任务队列 + // Task queue workerQueue struct { // mu 互斥锁 // mutex @@ -27,13 +27,13 @@ type ( curConcurrency int32 } - // asyncJob 异步任务 - // asynchronous job + // 异步任务 + // Asynchronous job asyncJob func() ) -// newWorkerQueue 创建一个任务队列 -// creates a task queue +// 创建一个任务队列 +// Creates a task queue func newWorkerQueue(maxConcurrency int32) *workerQueue { c := &workerQueue{ mu: sync.Mutex{}, @@ -44,7 +44,7 @@ func newWorkerQueue(maxConcurrency int32) *workerQueue { } // 获取一个任务 -// getJob retrieves a job from the worker queue +// Retrieves a job from the worker queue func (c *workerQueue) getJob(newJob asyncJob, delta int32) asyncJob { c.mu.Lock() defer c.mu.Unlock() @@ -65,7 +65,7 @@ func (c *workerQueue) getJob(newJob asyncJob, delta int32) asyncJob { } // 循环执行任务 -// do continuously executes jobs in the worker queue +// Do continuously executes jobs in the worker queue func (c *workerQueue) do(job asyncJob) { for job != nil { job() @@ -74,7 +74,7 @@ func (c *workerQueue) do(job asyncJob) { } // Push 追加任务, 有资源空闲的话会立即执行 -// adds a job to the queue and executes it immediately if resources are available +// Adds a job to the queue and executes it immediately if resources are available func (c *workerQueue) Push(job asyncJob) { if nextJob := c.getJob(job, 0); nextJob != nil { go c.do(nextJob) diff --git a/types.go b/types.go index 0036299d..9acc0d90 100644 --- a/types.go +++ b/types.go @@ -28,8 +28,8 @@ const ( OpcodePong Opcode = 0xA // 心跳回应 ) -// isDataFrame 判断操作码是否为数据帧 -// checks if the opcode is a data frame +// 判断操作码是否为数据帧 +// Checks if the opcode is a data frame func (c Opcode) isDataFrame() bool { return c <= OpcodeBinary } @@ -45,7 +45,7 @@ type CloseError struct { } // Error 关闭错误的描述 -// returns a description of the close error +// Returns a description of the close error func (c *CloseError) Error() string { return fmt.Sprintf("gws: connection closed, code=%d, reason=%s", c.Code, string(c.Reason)) } @@ -123,55 +123,55 @@ func (b BuiltinEventHandler) OnMessage(socket *Conn, message *Message) {} type frameHeader [frameHeaderSize]byte // GetFIN 返回 FIN 位的值 -// returns the value of the FIN bit +// Returns the value of the FIN bit func (c *frameHeader) GetFIN() bool { return ((*c)[0] >> 7) == 1 } // GetRSV1 返回 RSV1 位的值 -// returns the value of the RSV1 bit +// Returns the value of the RSV1 bit func (c *frameHeader) GetRSV1() bool { return ((*c)[0] << 1 >> 7) == 1 } // GetRSV2 返回 RSV2 位的值 -// returns the value of the RSV2 bit +// Returns the value of the RSV2 bit func (c *frameHeader) GetRSV2() bool { return ((*c)[0] << 2 >> 7) == 1 } // GetRSV3 返回 RSV3 位的值 -// returns the value of the RSV3 bit +// Returns the value of the RSV3 bit func (c *frameHeader) GetRSV3() bool { return ((*c)[0] << 3 >> 7) == 1 } // GetOpcode 返回操作码 -// returns the opcode +// Returns the opcode func (c *frameHeader) GetOpcode() Opcode { return Opcode((*c)[0] << 4 >> 4) } // GetMask 返回掩码 -// returns the value of the mask bytes +// Returns the value of the mask bytes func (c *frameHeader) GetMask() bool { return ((*c)[1] >> 7) == 1 } // GetLengthCode 返回长度代码 -// returns the length code +// Returns the length code func (c *frameHeader) GetLengthCode() uint8 { return (*c)[1] << 1 >> 1 } // SetMask 设置 Mask 位为 1 -// sets the Mask bit to 1 +// Sets the Mask bit to 1 func (c *frameHeader) SetMask() { (*c)[1] |= uint8(128) } // SetLength 设置帧的长度,并返回偏移量 -// sets the frame length and returns the offset +// Sets the frame length and returns the offset func (c *frameHeader) SetLength(n uint64) (offset int) { if n <= internal.ThresholdV1 { (*c)[1] += uint8(n) @@ -188,13 +188,13 @@ func (c *frameHeader) SetLength(n uint64) (offset int) { } // SetMaskKey 设置掩码 -// sets the mask +// Sets the mask func (c *frameHeader) SetMaskKey(offset int, key [4]byte) { copy((*c)[offset:offset+4], key[0:]) } // GenerateHeader 生成帧头 -// generates a frame header +// Generates a frame header // 可以考虑每个客户端连接带一个随机数发生器 // Consider having a random number generator for each client connection func (c *frameHeader) GenerateHeader(isServer bool, fin bool, compress bool, opcode Opcode, length int) (headerLength int, maskBytes []byte) { @@ -220,7 +220,7 @@ func (c *frameHeader) GenerateHeader(isServer bool, fin bool, compress bool, opc } // Parse 解析完整协议头, 最多14字节, 返回payload长度 -// parses the complete protocol header, up to 14 bytes, and returns the payload length +// Parses the complete protocol header, up to 14 bytes, and returns the payload length func (c *frameHeader) Parse(reader io.Reader) (int, error) { if err := internal.ReadN(reader, (*c)[0:2]); err != nil { return 0, err @@ -255,7 +255,7 @@ func (c *frameHeader) Parse(reader io.Reader) (int, error) { } // GetMaskKey 返回掩码 -// returns the mask +// Returns the mask func (c *frameHeader) GetMaskKey() []byte { return (*c)[10:14] } @@ -275,19 +275,19 @@ type Message struct { } // Read 从消息中读取数据到给定的字节切片 p 中 -// reads data from the message into the given byte slice p +// Reads data from the message into the given byte slice p func (c *Message) Read(p []byte) (n int, err error) { return c.Data.Read(p) } // Bytes 返回消息的数据缓冲区的字节切片 -// returns the byte slice of the message's data buffer +// Returns the byte slice of the message's data buffer func (c *Message) Bytes() []byte { return c.Data.Bytes() } // Close 关闭消息, 回收资源 -// close message, recycling resources +// Close message, recycling resources func (c *Message) Close() error { binaryPool.Put(c.Data) c.Data = nil @@ -312,8 +312,8 @@ type continuationFrame struct { buffer *bytes.Buffer } -// reset 重置延续帧的状态 -// resets the state of the continuation frame +// 重置延续帧的状态 +// Resets the state of the continuation frame func (c *continuationFrame) reset() { c.initialized = false c.compressed = false @@ -322,15 +322,15 @@ func (c *continuationFrame) reset() { } // Logger 日志接口 -// logger interface +// Logger interface type Logger interface { // Error 打印错误日志 // Printing the error log Error(v ...any) } -// stdLogger 标准日志库 -// standard Log Library +// 标准日志库 +// Standard Log Library type stdLogger struct{} // Error 打印错误日志 diff --git a/upgrader.go b/upgrader.go index df515f0c..5e4a255a 100644 --- a/upgrader.go +++ b/upgrader.go @@ -29,7 +29,7 @@ type responseWriter struct { } // Init 初始化 -// initializes the responseWriter struct +// Initializes the responseWriter struct func (c *responseWriter) Init() *responseWriter { c.b = binaryPool.Get(512) c.b.WriteString("HTTP/1.1 101 Switching Protocols\r\n") @@ -39,14 +39,14 @@ func (c *responseWriter) Init() *responseWriter { } // Close 回收资源 -// recycling resources +// Recycling resources func (c *responseWriter) Close() { binaryPool.Put(c.b) c.b = nil } // WithHeader 添加 HTTP Header -// adds an http header +// Adds an http header func (c *responseWriter) WithHeader(k, v string) { c.b.WriteString(k) c.b.WriteString(": ") @@ -55,7 +55,7 @@ func (c *responseWriter) WithHeader(k, v string) { } // WithExtraHeader 添加额外的 HTTP Header -// adds extra http header +// Adds extra http header func (c *responseWriter) WithExtraHeader(h http.Header) { for k, _ := range h { c.WithHeader(k, h.Get(k)) @@ -63,7 +63,7 @@ func (c *responseWriter) WithExtraHeader(h http.Header) { } // WithSubProtocol 根据请求头和预期的子协议列表设置子协议 -// sets the subprotocol based on the request header and the expected subprotocols list +// Sets the subprotocol based on the request header and the expected subprotocols list func (c *responseWriter) WithSubProtocol(requestHeader http.Header, expectedSubProtocols []string) { if len(expectedSubProtocols) > 0 { c.subprotocol = internal.GetIntersectionElem(expectedSubProtocols, internal.Split(requestHeader.Get(internal.SecWebSocketProtocol.Key), ",")) @@ -76,7 +76,7 @@ func (c *responseWriter) WithSubProtocol(requestHeader http.Header, expectedSubP } // Write 将缓冲区内容写入连接,并设置超时 -// writes the buffer content to the connection and sets the timeout +// Writes the buffer content to the connection and sets the timeout func (c *responseWriter) Write(conn net.Conn, timeout time.Duration) error { if c.err != nil { return c.err @@ -98,7 +98,7 @@ type Upgrader struct { } // NewUpgrader 创建一个新的 Upgrader 实例 -// creates a new instance of Upgrader +// Creates a new instance of Upgrader func NewUpgrader(eventHandler Event, option *ServerOption) *Upgrader { u := &Upgrader{ option: initServerOption(option), @@ -111,8 +111,8 @@ func NewUpgrader(eventHandler Event, option *ServerOption) *Upgrader { return u } -// hijack 劫持 HTTP 连接并返回底层的网络连接和缓冲读取器 -// hijack hijacks the HTTP connection and returns the underlying network connection and buffered reader +// 劫持 HTTP 连接并返回底层的网络连接和缓冲读取器 +// Hijacks the HTTP connection and returns the underlying network connection and buffered reader func (c *Upgrader) hijack(w http.ResponseWriter) (net.Conn, *bufio.Reader, error) { hj, ok := w.(http.Hijacker) if !ok { @@ -127,8 +127,8 @@ func (c *Upgrader) hijack(w http.ResponseWriter) (net.Conn, *bufio.Reader, error return netConn, br, nil } -// getPermessageDeflate 根据客户端和服务器的扩展协商结果获取 PermessageDeflate 配置 -// gets the PermessageDeflate configuration based on the negotiation results between the client and server extensions +// 根据客户端和服务器的扩展协商结果获取 PermessageDeflate 配置 +// Gets the PermessageDeflate configuration based on the negotiation results between the client and server extensions func (c *Upgrader) getPermessageDeflate(extensions string) PermessageDeflate { clientPD := permessageNegotiation(extensions) serverPD := c.option.PermessageDeflate @@ -147,7 +147,7 @@ func (c *Upgrader) getPermessageDeflate(extensions string) PermessageDeflate { } // Upgrade 升级 HTTP 连接到 WebSocket 连接 -// upgrades the HTTP connection to a WebSocket connection +// Upgrades the HTTP connection to a WebSocket connection func (c *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request) (*Conn, error) { netConn, br, err := c.hijack(w) if err != nil { @@ -157,7 +157,7 @@ func (c *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request) (*Conn, error } // UpgradeFromConn 从现有的网络连接升级到 WebSocket 连接 -// upgrades from an existing network connection to a WebSocket connection +// Upgrades from an existing network connection to a WebSocket connection func (c *Upgrader) UpgradeFromConn(conn net.Conn, br *bufio.Reader, r *http.Request) (*Conn, error) { socket, err := c.doUpgradeFromConn(conn, br, r) if err != nil { @@ -167,8 +167,8 @@ func (c *Upgrader) UpgradeFromConn(conn net.Conn, br *bufio.Reader, r *http.Requ return socket, err } -// writeErr 向客户端写入 HTTP 错误响应 -// writes an HTTP error response to the client +// 向客户端写入 HTTP 错误响应 +// Writes an HTTP error response to the client func (c *Upgrader) writeErr(conn net.Conn, err error) error { var str = err.Error() var buf = binaryPool.Get(256) @@ -183,8 +183,8 @@ func (c *Upgrader) writeErr(conn net.Conn, err error) error { return result } -// doUpgradeFromConn 从现有的网络连接升级到 WebSocket 连接 -// upgrades from an existing network connection to a WebSocket connection +// 从现有的网络连接升级到 WebSocket 连接 +// Upgrades from an existing network connection to a WebSocket connection func (c *Upgrader) doUpgradeFromConn(netConn net.Conn, br *bufio.Reader, r *http.Request) (*Conn, error) { // 授权请求,如果授权失败,返回未授权错误 // Authorize the request, if authorization fails, return an unauthorized error @@ -257,7 +257,7 @@ func (c *Upgrader) doUpgradeFromConn(netConn net.Conn, br *bufio.Reader, r *http } // Server WebSocket服务器 -// websocket server +// Websocket server type Server struct { // 升级器,用于将 HTTP 连接升级到 WebSocket 连接 // Upgrader, used to upgrade HTTP connections to WebSocket connections @@ -277,7 +277,7 @@ type Server struct { } // NewServer 创建一个新的 WebSocket 服务器实例 -// creates a new WebSocket server instance +// Creates a new WebSocket server instance func NewServer(eventHandler Event, option *ServerOption) *Server { var c = &Server{upgrader: NewUpgrader(eventHandler, option)} c.option = c.upgrader.option @@ -294,13 +294,13 @@ func NewServer(eventHandler Event, option *ServerOption) *Server { } // GetUpgrader 获取服务器的升级器实例 -// retrieves the upgrader instance of the server +// Retrieves the upgrader instance of the server func (c *Server) GetUpgrader() *Upgrader { return c.upgrader } // Run 启动 WebSocket 服务器,监听指定地址 -// starts the WebSocket server and listens on the specified address +// Starts the WebSocket server and listens on the specified address func (c *Server) Run(addr string) error { listener, err := net.Listen("tcp", addr) if err != nil { @@ -310,7 +310,7 @@ func (c *Server) Run(addr string) error { } // RunTLS 启动支持 TLS 的 WebSocket 服务器,监听指定地址 -// starts the WebSocket server with TLS support and listens on the specified address +// Starts the WebSocket server with TLS support and listens on the specified address func (c *Server) RunTLS(addr string, certFile, keyFile string) error { cert, err := tls.LoadX509KeyPair(certFile, keyFile) if err != nil { @@ -332,7 +332,7 @@ func (c *Server) RunTLS(addr string, certFile, keyFile string) error { } // RunListener 使用指定的监听器运行 WebSocket 服务器 -// runs the WebSocket server using the specified listener +// Runs the WebSocket server using the specified listener func (c *Server) RunListener(listener net.Listener) error { defer listener.Close() diff --git a/writer.go b/writer.go index 311685cc..ce4e83e0 100644 --- a/writer.go +++ b/writer.go @@ -95,7 +95,7 @@ func (c *Conn) Async(f func()) { } // 执行写入逻辑, 注意妥善维护压缩字典 -// doWrite executes the write logic, ensuring proper maintenance of the compression dictionary +// Executes the write logic, ensuring proper maintenance of the compression dictionary func (c *Conn) doWrite(opcode Opcode, payload internal.Payload) error { c.mu.Lock() defer c.mu.Unlock() @@ -115,8 +115,8 @@ func (c *Conn) doWrite(opcode Opcode, payload internal.Payload) error { return err } -// genFrame 生成帧数据 -// generates the frame data +// 生成帧数据 +// Generates the frame data func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) { if opcode == OpcodeText && !payload.CheckEncoding(c.config.CheckUtf8Enabled, uint8(opcode)) { return nil, internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding) @@ -148,8 +148,8 @@ func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, isBroadcast boo return buf, nil } -// compressData 压缩数据并生成帧 -// compresses the data and generates the frame +// 压缩数据并生成帧 +// Compresses the data and generates the frame func (c *Conn) compressData(buf *bytes.Buffer, opcode Opcode, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) { err := c.deflater.Compress(payload, buf, c.getCpsDict(isBroadcast)) if err != nil { @@ -184,7 +184,7 @@ type ( ) // NewBroadcaster 创建广播器 -// creates a broadcaster +// Creates a broadcaster // 相比循环调用 WriteAsync, Broadcaster 只会压缩一次消息, 可以节省大量 CPU 开销. // Instead of calling WriteAsync in a loop, Broadcaster compresses the message only once, saving a lot of CPU overhead. func NewBroadcaster(opcode Opcode, payload []byte) *Broadcaster { @@ -197,8 +197,8 @@ func NewBroadcaster(opcode Opcode, payload []byte) *Broadcaster { return c } -// writeFrame 将帧数据写入连接 -// writes the frame data to the connection +// 将帧数据写入连接 +// Writes the frame data to the connection func (c *Broadcaster) writeFrame(socket *Conn, frame *bytes.Buffer) error { if socket.isClosed() { return ErrConnClosed @@ -235,8 +235,8 @@ func (c *Broadcaster) Broadcast(socket *Conn) error { return nil } -// doClose 关闭广播器并释放资源 -// closes the broadcaster and releases resources +// 释放资源 +// releases resources func (c *Broadcaster) doClose() { for _, item := range c.msgs { if item != nil { @@ -246,7 +246,7 @@ func (c *Broadcaster) doClose() { } // Close 释放资源 -// releases resources +// Releases resources // 在完成所有 Broadcast 调用之后执行 Close 方法释放资源。 // Call the Close method after all the Broadcasts have been completed to release the resources. func (c *Broadcaster) Close() error {