-
Notifications
You must be signed in to change notification settings - Fork 7
/
send.go
executable file
·187 lines (163 loc) · 5.15 KB
/
send.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package wkproto
import (
"fmt"
"github.com/pkg/errors"
)
// SendPacket 发送包
type SendPacket struct {
Framer
Setting Setting
MsgKey string // 用于验证此消息是否合法(仿中间人篡改)
Expire uint32 // 消息过期时间 0 表示永不过期
ClientSeq uint64 // 客户端提供的序列号,在客户端内唯一
ClientMsgNo string // 客户端消息唯一编号一般是uuid,为了去重
StreamNo string // 流式编号
ChannelID string // 频道ID(如果是个人频道ChannelId为个人的UID)
ChannelType uint8 // 频道类型(1.个人 2.群组)
Topic string // 消息topic
Payload []byte // 消息内容
}
func (s *SendPacket) UniqueKey() string {
return fmt.Sprintf("%s-%d-%s-%d", s.ChannelID, s.ChannelType, s.ClientMsgNo, s.ClientSeq)
}
// GetPacketType 包类型
func (s *SendPacket) GetFrameType() FrameType {
return SEND
}
func (s *SendPacket) String() string {
return fmt.Sprintf("Setting:%v MsgKey:%s Expire: %d ClientSeq:%d ClientMsgNo:%s ChannelId:%s ChannelType:%d Topic:%s Payload:%s", s.Setting, s.MsgKey, s.Expire, s.ClientSeq, s.ClientMsgNo, s.ChannelID, s.ChannelType, s.Topic, string(s.Payload))
}
// func (s *SendPacket) reset() {
// s.Framer.RedDot = false
// s.Framer.DUP = false
// s.Framer.NoPersist = false
// s.Framer.SyncOnce = false
// s.Framer.FrameSize = 0
// s.Framer.RemainingLength = 0
// s.Setting = 0
// s.MsgKey = ""
// s.ClientSeq = 0
// s.ClientMsgNo = ""
// s.ChannelID = ""
// s.ChannelType = 0
// s.Topic = ""
// s.Payload = nil
// }
// VerityString 验证字符串
func (s *SendPacket) VerityString() string {
return fmt.Sprintf("%d%s%s%d%s", s.ClientSeq, s.ClientMsgNo, s.ChannelID, s.ChannelType, string(s.Payload))
}
// var sendPacketPool = sync.Pool{
// New: func() any {
// return &SendPacket{}
// },
// }
func decodeSend(frame Frame, data []byte, version uint8) (Frame, error) {
dec := NewDecoder(data)
// dec.p = data
// dec.offset = 0
sendPacket := &SendPacket{}
// sendPacket.reset()
sendPacket.Framer = frame.(Framer)
var err error
setting, err := dec.Uint8()
if err != nil {
return nil, errors.Wrap(err, "解码消息设置失败!")
}
sendPacket.Setting = Setting(setting)
// 消息序列号(客户端维护)
var clientSeq uint32
if clientSeq, err = dec.Uint32(); err != nil {
return nil, errors.Wrap(err, "解码ClientSeq失败!")
}
sendPacket.ClientSeq = uint64(clientSeq)
// // 客户端唯一标示
if sendPacket.ClientMsgNo, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码ClientMsgNo失败!")
}
// 是否开启了stream
if version >= 2 && sendPacket.Setting.IsSet(SettingStream) {
// 流式编号
if sendPacket.StreamNo, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码StreamNo失败!")
}
}
// 频道ID
if sendPacket.ChannelID, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码ChannelId失败!")
}
// 频道类型
if sendPacket.ChannelType, err = dec.Uint8(); err != nil {
return nil, errors.Wrap(err, "解码ChannelType失败!")
}
// 消息过期时间
if version >= 3 {
if sendPacket.Expire, err = dec.Uint32(); err != nil {
return nil, errors.Wrap(err, "解码Expire失败!")
}
}
// msg key
if sendPacket.MsgKey, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解码MsgKey失败!")
}
if sendPacket.Setting.IsSet(SettingTopic) {
// topic
if sendPacket.Topic, err = dec.String(); err != nil {
return nil, errors.Wrap(err, "解密topic消息失败!")
}
}
if sendPacket.Payload, err = dec.BinaryAll(); err != nil {
return nil, errors.Wrap(err, "解码payload失败!")
}
return sendPacket, err
}
func encodeSend(frame Frame, enc *Encoder, version uint8) error {
sendPacket := frame.(*SendPacket)
_ = enc.WriteByte(sendPacket.Setting.Uint8())
// 消息序列号(客户端维护)
enc.WriteUint32(uint32(sendPacket.ClientSeq))
// 客户端唯一标示
enc.WriteString(sendPacket.ClientMsgNo)
// 是否开启了stream
if version >= 2 && sendPacket.Setting.IsSet(SettingStream) {
// 流式编号
enc.WriteString(sendPacket.StreamNo)
}
// 频道ID
enc.WriteString(sendPacket.ChannelID)
// 频道类型
enc.WriteUint8(sendPacket.ChannelType)
// 消息过期时间
if version >= 3 {
enc.WriteUint32(sendPacket.Expire)
}
// msgKey
enc.WriteString(sendPacket.MsgKey)
if sendPacket.Setting.IsSet(SettingTopic) {
enc.WriteString(sendPacket.Topic)
}
// 消息内容
enc.WriteBytes(sendPacket.Payload)
return nil
}
func encodeSendSize(frame Frame, version uint8) int {
sendPacket := frame.(*SendPacket)
size := 0
size += SettingByteSize
size += ClientSeqByteSize
size += (len(sendPacket.ClientMsgNo) + StringFixLenByteSize)
if version >= 2 && sendPacket.Setting.IsSet(SettingStream) {
size += (len(sendPacket.StreamNo) + StringFixLenByteSize)
}
size += (len(sendPacket.ChannelID) + StringFixLenByteSize)
size += ChannelTypeByteSize
if version >= 3 {
size += ExpireByteSize
}
size += (len(sendPacket.MsgKey) + StringFixLenByteSize)
if sendPacket.Setting.IsSet(SettingTopic) {
size += (len(sendPacket.Topic) + StringFixLenByteSize)
}
size += len(sendPacket.Payload)
return size
}