-
Notifications
You must be signed in to change notification settings - Fork 0
/
cnet.go
198 lines (178 loc) · 4.2 KB
/
cnet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package cnet
import (
"github.com/cuckooemm/cnet/internal"
"net"
"time"
)
type Operation int
type Network int
const (
None Operation = iota
Close
Shutdown
)
const (
Tcp Network = iota
Udp
)
type IEventCallback interface {
// tcp
// 链接连接时回调
OnConnOpened(c Conn) (out []byte, op Operation)
// 链接关闭时回调
OnConnClosed(c Conn, err error) (op Operation)
// 读事件触发
ConnHandler(c Conn) (out []byte, op Operation)
// 唤醒conn时触发 c.wake
OnWakenHandler(c Conn) (out []byte, op Operation)
// udp
// 读事件触发
PackHandler(pack []byte, p Pconn) (out []byte, op Operation)
// 发送数据错误时触发
SendErr(remoteAddr string, err error)
}
type Conn interface {
// 返回用户定义数据。
Expand() map[string]interface{}
// 设置用户定义的数据
SetExpand(data map[string]interface{})
// 协议
Network() string
// 连接的本地套接字地址
LocalAddr() string
// 连接的远程对端地址
RemoteAddr() string
// Read从入站环形缓冲区中读取所有数据,而不会移动“read”指针,不会淘汰缓冲区数据,直到调用ResetBuffer方法为止 。
Read() (int, []byte)
// ResetBuffer重置入站环形缓冲区
ResetBuffer()
// ReadN从入站环形缓冲区和读取具有给定长度的字节,不会移动“读取”指针,直到调用ShiftN方法,它才会从缓冲区中逐出数据,
ReadN(n int) (int, []byte)
// ShiftN将“read”指针移入给定长度的缓冲区中。
ShiftN(n int) int
// BufferLength 返回入站环形缓冲区中可用数据的长度。
BufferLength() int
// AsyncWrite异步将数据写入客户端/连接,通常在单个goroutine中而不是事件循环goroutine中调用它。
AsyncWrite([]byte) error
// 唤醒会为此连接触发一个React事件。
Wake() error
// 关闭当前连接
Close() error
}
type Pconn interface {
// 协议
Network() string
// 连接的本地套接字地址
LocalAddr() (addr string)
// 连接的远程对端地址
RemoteAddr() (addr string)
// SendTo为UDP套接字写入数据,它允许您在各个goroutine中将数据发送回UDP套接字。
SendTo(buf []byte) error
}
type Cnet struct {
// protocol
Network Network
// address
Addr string
// reuseport
ReusePort bool
// event-loop number
MultiCore int
// tco keepAlive
TcpKeepAlive time.Duration
// callback
Callback IEventCallback
// log
Logger Logger
}
func (c *Cnet) Listener() error {
switch c.Network {
case Tcp:
var (
ln tcpListener
opt = TcpOption{
ReusePort: c.ReusePort,
MultiCore: c.MultiCore,
Logger: c.Logger,
TcpKeepAlive: c.TcpKeepAlive,
}
err error
)
if c.ReusePort {
ln.ln, err = internal.ReusePortListen("tcp", c.Addr)
} else {
ln.ln, err = net.Listen("tcp", c.Addr)
}
if err != nil {
return err
}
if err = ln.initFd(); err != nil {
return err
}
defer ln.close()
return startTcpService(c.Callback, &ln, &opt)
case Udp:
var (
ln udpListener
opt = UdpOption{
ReusePort: c.ReusePort,
MultiCore: c.MultiCore,
Logger: c.Logger,
}
err error
)
if opt.ReusePort {
ln.ln, err = internal.ReusePortListenPacket("udp", c.Addr)
} else {
ln.ln, err = net.ListenPacket("udp", c.Addr)
}
if err != nil {
return err
}
if err = ln.initFd(); err != nil {
return err
}
defer ln.close()
return startUpdService(c.Callback, &ln, &opt)
default:
return ErrUnSupportProtocol
}
}
func TcpService(callback IEventCallback, addr string, opt TcpOption) error {
var (
ln tcpListener
err error
)
if opt.ReusePort {
ln.ln, err = internal.ReusePortListen("tcp", addr)
} else {
ln.ln, err = net.Listen("tcp", addr)
}
if err != nil {
return err
}
if err = ln.initFd(); err != nil {
return err
}
defer ln.close()
return startTcpService(callback, &ln, &opt)
}
func UdpService(callback IEventCallback, addr string, opt UdpOption) error {
var (
ln udpListener
err error
)
if opt.ReusePort {
ln.ln, err = internal.ReusePortListenPacket("udp", addr)
} else {
ln.ln, err = net.ListenPacket("udp", addr)
}
if err != nil {
return err
}
if err = ln.initFd(); err != nil {
return err
}
defer ln.close()
return startUpdService(callback, &ln, &opt)
}