forked from absolute8511/go-zanredisdb
-
Notifications
You must be signed in to change notification settings - Fork 1
/
types.go
209 lines (188 loc) · 5.79 KB
/
types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package zanredisdb
import (
"errors"
"strings"
"time"
)
var (
FailedOnClusterChanged = "ERR_CLUSTER_CHANGED"
FailedOnNotLeader = "E_FAILED_ON_NOT_LEADER"
FailedOnNotWritable = "E_FAILED_ON_NOT_WRITABLE"
FailedOnNodeStopped = "the node stopped"
errNoNodeForPartition = errors.New("no partition node")
errNoConnForHost = errors.New("no any connection for host")
defaultGetConnTimeoutForLargeKey = time.Millisecond * 50
)
const (
defaultMaxValueSize = 1023 * 1024
defaultLargeKeyConnPoolMinSize = 1
defaultManyArgsNum = 1024
)
func IsTimeoutErr(err error) bool {
if err != nil {
return strings.Contains(strings.ToLower(err.Error()), "i/o timeout")
}
return false
}
func IsConnectRefused(err error) bool {
if err != nil {
return strings.Contains(strings.ToLower(err.Error()), "connection refused")
}
return false
}
func IsConnectClosed(err error) bool {
if err != nil {
return strings.Contains(strings.ToLower(err.Error()), "use of closed network")
}
return false
}
func IsFailedOnClusterChanged(err error) bool {
if err != nil {
return strings.HasPrefix(err.Error(), FailedOnClusterChanged) ||
err == errNoNodeForPartition ||
strings.Contains(err.Error(), FailedOnNodeStopped)
}
return false
}
func IsFailedOnNotWritable(err error) bool {
if err != nil {
return strings.HasPrefix(err.Error(), FailedOnNotWritable)
}
return false
}
type RemoteClusterConf struct {
LookupList []string
IsPrimary bool
ClusterDC string
}
type MultiClusterConf []RemoteClusterConf
func (mcc MultiClusterConf) CheckValid() error {
primaryCnt := 0
for _, c := range mcc {
if c.IsPrimary {
primaryCnt++
}
if len(c.LookupList) == 0 {
return errors.New("cluster lookup list should not be empty")
}
if c.ClusterDC == "" {
return errors.New("multi clusters conf should have cluster dc info")
}
}
if primaryCnt > 1 {
return errors.New("primary cluster should be unique")
}
if primaryCnt != 1 {
return errors.New("missing primary cluster")
}
return nil
}
// This configuration will be used to isolate the large key write and exception key access.
// Write a value large than max allowed size will return error and for
// MaxAllowedValueSize > value > MaxAllowedValueSize/2, a isolated pool with only MinPoolSize connection will be used
// MaxAllowedValueSize/2 > value > MaxAllowedValueSize/4, a isolated pool with only 2*MinPoolSize connections will be used
// MaxAllowedValueSize/4 > value > MaxAllowedValueSize/8, a isolated pool with only 4*MinPoolSize connections will be used
// for command with more than 1024 arguments will use the isolated pool with only MinPoolSize connection
// for exception command will use the isolated pool with only MinPoolSize connection
type LargeKeyConf struct {
MinPoolSize int
GetConnTimeoutForLargeKey time.Duration
MaxAllowedValueSize int
}
func NewLargeKeyConf() *LargeKeyConf {
return &LargeKeyConf{
MaxAllowedValueSize: defaultMaxValueSize,
GetConnTimeoutForLargeKey: defaultGetConnTimeoutForLargeKey,
MinPoolSize: defaultLargeKeyConnPoolMinSize,
}
}
type Conf struct {
LookupList []string
// multi conf and lookuplist should not be used both
MultiConf MultiClusterConf
DialTimeout time.Duration
ReadTimeout time.Duration
RangeReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
MaxConnWait time.Duration
MaxRetryGetConn int
MaxActiveConn int
// idle num that will be kept for all idle connections
MaxIdleConn int
// default 0.4
RangeConnRatio float64
TendInterval int64
Namespace string
Password string
// the datacenter info for client
// will be used for a single cluster acrossing datacenter
DC string
}
func NewDefaultConf() *Conf {
return &Conf{
DialTimeout: time.Second * 3,
ReadTimeout: time.Second * 5,
RangeReadTimeout: time.Second * 10,
IdleTimeout: time.Second * 60,
MaxConnWait: defaultWaitConnTimeout,
MaxRetryGetConn: 3,
MaxActiveConn: 50,
MaxIdleConn: 10,
TendInterval: 3,
}
}
func (conf *Conf) CheckValid() error {
if len(conf.LookupList) > 0 && len(conf.MultiConf) > 0 {
return errors.New("configure invalid: should not use both LookupList and MultiConf")
}
if len(conf.LookupList) == 0 && len(conf.MultiConf) == 0 {
return errors.New("configure invalid: lookup list and multiconf can not both be empty")
}
if len(conf.MultiConf) > 0 {
return conf.MultiConf.CheckValid()
}
if conf.TendInterval <= 0 {
return errors.New("tend interval should be great than zero")
}
return nil
}
// api data response type
type node struct {
BroadcastAddress string `json:"broadcast_address"`
Hostname string `json:"hostname"`
RedisPort string `json:"redis_port"`
HTTPPort string `json:"http_port"`
GrpcPort string `json:"grpc_port"`
Version string `json:"version"`
DCInfo string `json:"dc_info"`
}
type PartitionNodeInfo struct {
Leader node `json:"leader"`
Replicas []node `json:"replicas"`
}
type queryNamespaceResp struct {
Epoch int64 `json:"epoch"`
EngType string `json:"eng_type"`
Partitions map[int]PartitionNodeInfo `json:"partitions"`
PartitionNum int `json:"partition_num"`
}
type NodeInfo struct {
RegID uint64
ID string
NodeIP string
Hostname string
RedisPort string
HttpPort string
RpcPort string
RaftTransportAddr string
Version string
Tags map[string]bool
DataRoot string
RsyncModule string
Epoch int64
}
type listPDResp struct {
PDNodes []NodeInfo `json:"pdnodes"`
PDLeader NodeInfo `json:"pdleader"`
}