-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathparser.go
154 lines (132 loc) · 3.61 KB
/
parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright (C) 2018 Kun Zhong All rights reserved.
// Use of this source code is governed by a Licensed under the Apache License, Version 2.0 (the "License");
package grpcx
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"github.com/pigogo/grpcx/codec"
"github.com/pigogo/grpcx/compresser"
xlog "github.com/pigogo/grpcx/grpclog"
)
const headMagic byte = 0x66
const headerLen = 8
func parseAndRecvMsg(reader *bufio.Reader, dc compresser.Decompressor, maxReceiveMessageSize int, rusedBuffer ...bool) (header *PackHeader, msg []byte, err error) {
var (
netHeader []byte
rawHeader []byte
)
netHeader, err = reader.Peek(headerLen)
if err != nil {
return
}
if netHeader[0] != headMagic {
err = fmt.Errorf("grpcx: received invalid message: invalid magic")
xlog.Errorln(err)
return
}
hlen := int(binary.BigEndian.Uint16(netHeader[2:4]))
if hlen == 0 {
err = fmt.Errorf("grpcx: received invalid message: zero head len")
xlog.Errorln(err)
return
}
blen := int(binary.BigEndian.Uint32(netHeader[4:8]))
if hlen+blen > maxReceiveMessageSize {
err = fmt.Errorf("grpcx: received message larger than max (%d vs. %d)", blen, maxReceiveMessageSize)
xlog.Errorln(err)
return
}
err = checkRecvPayload(compresser.CompType(netHeader[1]), dc)
if err != nil {
xlog.Errorf("grpcx: checkRecvPayload fail:%v", err)
return
}
reader.Discard(headerLen)
rawHeader, err = reader.Peek(hlen)
if err != nil {
xlog.Errorf("grpcx: peek header fail:%v", err)
return
}
//parse header
header = new(PackHeader)
err = header.Unmarshal(rawHeader)
if err != nil {
xlog.Errorf("grpcx: header unmarshal fail:%v", err)
return
}
reader.Discard(hlen)
doDecompress := bool(dc != nil && compresser.CompType(netHeader[1]) != compresser.CompTypeNone)
// get msg body: if reused the buffer
if doDecompress || rusedBuffer != nil {
msg, err = reader.Peek(blen)
reader.Discard(blen)
} else {
msg = make([]byte, blen)
_, err = io.ReadFull(reader, msg)
}
// fail to get body
if err != nil {
xlog.Errorf("grpcx: peek body fail:%v", err)
return
}
if doDecompress {
if len(msg) > 0 {
msg, err = dc.Do(bytes.NewReader(msg))
if err != nil {
xlog.Errorf("grpcx: body decode fail:%v", err)
return
}
}
}
return
}
func encodeNetmsg(cd codec.Codec, cmp compresser.Compressor, header *PackHeader, msg interface{}, cbuf *bytes.Buffer, maxSendMsgSize int) ([]byte, error) {
var (
msgBuf []byte
)
hbuf, err := header.Marshal()
if err != nil {
xlog.Errorf("grpcx: header marshal fail:%v", err)
return nil, err
}
if msg != nil {
msgBuf, err = cd.Marshal(msg)
if err != nil {
xlog.Errorf("grpcx: body marshal fail:%v", err)
return nil, err
}
}
ct := compresser.CompTypeNone
if cmp != nil && msg != nil {
ct = cmp.Type()
}
head := [headerLen]byte{headMagic, byte(ct)}
cbuf.Write(head[:])
cbuf.Write(hbuf)
hlen := len(hbuf)
blen := len(msgBuf)
if ct != compresser.CompTypeNone {
if err = cmp.Do(cbuf, msgBuf); err != nil {
xlog.Errorf("grpcx: body encompress fail:%v", err)
return nil, err
}
blen = cbuf.Len() - headerLen - hlen
if hlen+blen > maxSendMsgSize {
xlog.Errorf("grpcx: send message size overflow:%v max:%v", hlen+blen, maxSendMsgSize)
return nil, fmt.Errorf("grpcx: message size overflow")
}
} else {
if hlen+blen > maxSendMsgSize {
xlog.Errorf("grpcx: send message size overflow:%v max:%v", hlen+blen, maxSendMsgSize)
return nil, fmt.Errorf("grpcx: message size overflow")
}
cbuf.Write(msgBuf)
}
out := cbuf.Bytes()
binary.BigEndian.PutUint16(out[2:], uint16(hlen))
binary.BigEndian.PutUint32(out[4:], uint32(blen))
return out, nil
}